Author: gaohoward
Date: 2010-11-08 09:22:13 -0500 (Mon, 08 Nov 2010)
New Revision: 9852
Added:
trunk/src/main/org/hornetq/api/jms/management/JMSConnectionInfo.java
trunk/src/main/org/hornetq/api/jms/management/JMSConsumerInfo.java
trunk/src/main/org/hornetq/api/jms/management/JMSSessionInfo.java
trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAddMetaDataMessage.java
Modified:
trunk/src/main/org/hornetq/api/core/client/ClientSession.java
trunk/src/main/org/hornetq/api/jms/management/DestinationControl.java
trunk/src/main/org/hornetq/api/jms/management/JMSQueueControl.java
trunk/src/main/org/hornetq/api/jms/management/JMSServerControl.java
trunk/src/main/org/hornetq/api/jms/management/SubscriptionInfo.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
trunk/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
trunk/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
trunk/src/main/org/hornetq/core/server/HornetQServer.java
trunk/src/main/org/hornetq/core/server/Queue.java
trunk/src/main/org/hornetq/core/server/ServerConsumer.java
trunk/src/main/org/hornetq/core/server/ServerSession.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/hornetq/jms/client/HornetQConnection.java
trunk/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java
trunk/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java
trunk/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java
trunk/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlUsingJMSTest.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
Log:
HORNETQ-416
Modified: trunk/src/main/org/hornetq/api/core/client/ClientSession.java
===================================================================
--- trunk/src/main/org/hornetq/api/core/client/ClientSession.java 2010-11-06 00:03:12 UTC
(rev 9851)
+++ trunk/src/main/org/hornetq/api/core/client/ClientSession.java 2010-11-08 14:22:13 UTC
(rev 9852)
@@ -562,4 +562,9 @@
*/
void setSendAcknowledgementHandler(SendAcknowledgementHandler handler);
+ /**
+ * Attach any metadata to the session.
+ * @throws HornetQException
+ */
+ void addMetaData(String key, String data) throws HornetQException;
}
Modified: trunk/src/main/org/hornetq/api/jms/management/DestinationControl.java
===================================================================
--- trunk/src/main/org/hornetq/api/jms/management/DestinationControl.java 2010-11-06
00:03:12 UTC (rev 9851)
+++ trunk/src/main/org/hornetq/api/jms/management/DestinationControl.java 2010-11-08
14:22:13 UTC (rev 9852)
@@ -47,6 +47,16 @@
*/
int getMessageCount() throws Exception;
+ /**
+ * Returns the number of messages that this queue is currently delivering to its
consumers.
+ */
+ int getDeliveringCount();
+
+ /**
+ * Returns the number of messages added to this queue since it was created.
+ */
+ long getMessagesAdded();
+
// Operations ----------------------------------------------------
/**
Copied: trunk/src/main/org/hornetq/api/jms/management/JMSConnectionInfo.java (from rev
9826,
branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSConnectionInfo.java)
===================================================================
--- trunk/src/main/org/hornetq/api/jms/management/JMSConnectionInfo.java
(rev 0)
+++ trunk/src/main/org/hornetq/api/jms/management/JMSConnectionInfo.java 2010-11-08
14:22:13 UTC (rev 9852)
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.api.jms.management;
+
+import org.hornetq.utils.json.JSONArray;
+import org.hornetq.utils.json.JSONObject;
+
+/**
+ * A JMSConnectionInfo
+ *
+ * @author jmesnil
+ *
+ *
+ */
+public class JMSConnectionInfo
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final String connectionID;
+
+ private final String clientAddress;
+
+ private final long creationTime;
+
+ private final String clientID;
+
+ private final String username;
+
+
+ // Static --------------------------------------------------------
+
+ public static JMSConnectionInfo[] from(final String jsonString) throws Exception
+ {
+ JSONArray array = new JSONArray(jsonString);
+ JMSConnectionInfo[] infos = new JMSConnectionInfo[array.length()];
+ for (int i = 0; i < array.length(); i++)
+ {
+ JSONObject obj = array.getJSONObject(i);
+ String cid = obj.isNull("clientID") ? null :
obj.getString("clientID");
+ String uname = obj.isNull("principal") ? null :
obj.getString("principal");
+
+ JMSConnectionInfo info = new
JMSConnectionInfo(obj.getString("connectionID"),
+
obj.getString("clientAddress"),
+
obj.getLong("creationTime"),
+ cid,
+ uname);
+ infos[i] = info;
+ }
+ return infos;
+ }
+
+ // Constructors --------------------------------------------------
+
+ private JMSConnectionInfo(final String connectionID,
+ final String clientAddress,
+ final long creationTime,
+ final String clientID,
+ final String username)
+ {
+ this.connectionID = connectionID;
+ this.clientAddress = clientAddress;
+ this.creationTime = creationTime;
+ this.clientID = clientID;
+ this.username = username;
+ }
+
+ // Public --------------------------------------------------------
+
+ public String getConnectionID()
+ {
+ return connectionID;
+ }
+
+ public String getClientAddress()
+ {
+ return clientAddress;
+ }
+
+ public long getCreationTime()
+ {
+ return creationTime;
+ }
+
+ public String getClientID()
+ {
+ return clientID;
+ }
+
+ public String getUsername()
+ {
+ return username;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Copied: trunk/src/main/org/hornetq/api/jms/management/JMSConsumerInfo.java (from rev 9826,
branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSConsumerInfo.java)
===================================================================
--- trunk/src/main/org/hornetq/api/jms/management/JMSConsumerInfo.java
(rev 0)
+++ trunk/src/main/org/hornetq/api/jms/management/JMSConsumerInfo.java 2010-11-08 14:22:13
UTC (rev 9852)
@@ -0,0 +1,147 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.api.jms.management;
+
+import org.hornetq.utils.json.JSONArray;
+import org.hornetq.utils.json.JSONObject;
+
+/**
+ * Helper class to create Java Objects from the
+ * JSON serialization returned by {@link JMSServerControl#listConsumersAsJSON(String)}
and related methods.
+ *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ */
+public class JMSConsumerInfo
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final String consumerID;
+
+ private final String connectionID;
+
+ private final String destinationName;
+
+ private final String destinationType;
+
+ private final boolean browseOnly;
+
+ private final long creationTime;
+
+ private final boolean durable;
+
+ private final String filter;
+
+ // Static --------------------------------------------------------
+
+ /**
+ * Returns an array of SubscriptionInfo corresponding to the JSON serialization
returned
+ * by {@link TopicControl#listAllSubscriptionsAsJSON()} and related methods.
+ */
+ public static JMSConsumerInfo[] from(final String jsonString) throws Exception
+ {
+ JSONArray array = new JSONArray(jsonString);
+ JMSConsumerInfo[] infos = new JMSConsumerInfo[array.length()];
+ for (int i = 0; i < array.length(); i++)
+ {
+ JSONObject sub = array.getJSONObject(i);
+ JMSConsumerInfo info = new
JMSConsumerInfo(sub.getString("consumerID"),
+
sub.getString("connectionID"),
+
sub.getString("destinationName"),
+
sub.getString("destinationType"),
+
sub.getBoolean("browseOnly"),
+
sub.getLong("creationTime"),
+ sub.getBoolean("durable"),
+ sub.optString("filter",
null));
+ infos[i] = info;
+ }
+
+ return infos;
+ }
+
+ // Constructors --------------------------------------------------
+
+ private JMSConsumerInfo(final String consumerID,
+ final String connectionID,
+ final String destinationName,
+ final String destinationType,
+ final boolean browseOnly,
+ final long creationTime,
+ final boolean durable,
+ final String filter)
+ {
+ this.consumerID = consumerID;
+ this.connectionID = connectionID;
+ this.destinationName = destinationName;
+ this.destinationType = destinationType;
+ this.browseOnly = browseOnly;
+ this.creationTime = creationTime;
+ this.durable = durable;
+ this.filter = filter;
+ }
+
+ // Public --------------------------------------------------------
+
+ public String getConsumerID()
+ {
+ return consumerID;
+ }
+
+ public String getConnectionID()
+ {
+ return connectionID;
+ }
+
+ public String getDestinationName()
+ {
+ return destinationName;
+ }
+
+ public String getDestinationType()
+ {
+ return destinationType;
+ }
+
+ public boolean isBrowseOnly()
+ {
+ return browseOnly;
+ }
+
+ public long getCreationTime()
+ {
+ return creationTime;
+ }
+
+ /**
+ * @return the durable
+ */
+ public boolean isDurable()
+ {
+ return durable;
+ }
+
+ public String getFilter()
+ {
+ return filter;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: trunk/src/main/org/hornetq/api/jms/management/JMSQueueControl.java
===================================================================
--- trunk/src/main/org/hornetq/api/jms/management/JMSQueueControl.java 2010-11-06 00:03:12
UTC (rev 9851)
+++ trunk/src/main/org/hornetq/api/jms/management/JMSQueueControl.java 2010-11-08 14:22:13
UTC (rev 9852)
@@ -51,11 +51,6 @@
void setDeadLetterAddress(@Parameter(name = "deadLetterAddress", desc =
"Dead-letter address of the queue") String deadLetterAddress) throws Exception;
/**
- * Returns the number of messages added to this queue since it was created.
- */
- long getMessagesAdded();
-
- /**
* Returns the number of scheduled messages in this queue.
*/
long getScheduledCount();
@@ -66,11 +61,6 @@
int getConsumerCount();
/**
- * Returns the number of messages that this queue is currently delivering to its
consumers.
- */
- int getDeliveringCount();
-
- /**
* returns the selector for the queue
*/
String getSelector();
Modified: trunk/src/main/org/hornetq/api/jms/management/JMSServerControl.java
===================================================================
--- trunk/src/main/org/hornetq/api/jms/management/JMSServerControl.java 2010-11-06
00:03:12 UTC (rev 9851)
+++ trunk/src/main/org/hornetq/api/jms/management/JMSServerControl.java 2010-11-08
14:22:13 UTC (rev 9852)
@@ -229,12 +229,55 @@
String[] listConnectionIDs() throws Exception;
/**
+ * Lists all the connections connected to this server.
+ * The returned String is a JSON string containing an array of JMSConnectionInfo
objects.
+ *
+ * @see JMSConnectionInfo#from(String)
+ */
+ @Operation(desc = "List all JMS connections")
+ String listConnectionsAsJSON() throws Exception;
+
+ /**
* Lists all the sessions IDs for the specified connection ID.
*/
@Operation(desc = "List the sessions for the given connectionID", impact =
MBeanOperationInfo.INFO)
String[] listSessions(@Parameter(desc = "a connection ID", name =
"connectionID") String connectionID) throws Exception;
/**
+ * Lists all the consumers which belongs to the JMS Connection specified by the
connectionID.
+ * The returned String is a JSON string containing an array of JMSConsumerInfo
objects.
+ *
+ * @see JMSConsumerInfo#from(String)
+ */
+ @Operation(desc = "List all JMS consumers associated to a JMS Connection")
+ String listConsumersAsJSON(@Parameter(desc = "a connection ID", name =
"connectionID") String connectionID) throws Exception;
+
+ /**
+ * Lists all addresses to which the designated server session has sent messages.
+ */
+ @Operation(desc = "Lists all addresses to which the designated session has sent
messages", impact = MBeanOperationInfo.INFO)
+ String[] listTargetDestinations(@Parameter(desc = "a session ID", name =
"sessionID") String sessionID) throws Exception;
+
+ /**
+ * Returns the last sent message's ID from the given session to an address.
+ */
+ @Operation(desc = "Returns the last sent message's ID from the given session
to an address", impact = MBeanOperationInfo.INFO)
+ String getLastSentMessageID(@Parameter(desc = "session name", name =
"sessionID") String sessionID,
+ @Parameter(desc = "address", name =
"address") String address) throws Exception;
+
+ /**
+ * Gets the session's creation time.
+ */
+ @Operation(desc = "Gets the sessions creation time", impact =
MBeanOperationInfo.INFO)
+ String getSessionCreationTime(@Parameter(desc = "session name", name =
"sessionID") String sessionID) throws Exception;
+
+ /**
+ * Lists all the sessions IDs for the specified connection ID.
+ */
+ @Operation(desc = "List the sessions for the given connectionID", impact =
MBeanOperationInfo.INFO)
+ String listSessionsAsJSON(@Parameter(desc = "a connection ID", name =
"connectionID") String connectionID) throws Exception;
+
+ /**
* List all the prepared transaction, sorted by date,
* oldest first, with details, in text format
*/
@@ -247,6 +290,5 @@
*/
@Operation(desc = "List all the prepared transaction, sorted by date, oldest
first, with details, in HTML format", impact = MBeanOperationInfo.INFO)
String listPreparedTransactionDetailsAsHTML() throws Exception;
-
}
Copied: trunk/src/main/org/hornetq/api/jms/management/JMSSessionInfo.java (from rev 9826,
branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSSessionInfo.java)
===================================================================
--- trunk/src/main/org/hornetq/api/jms/management/JMSSessionInfo.java
(rev 0)
+++ trunk/src/main/org/hornetq/api/jms/management/JMSSessionInfo.java 2010-11-08 14:22:13
UTC (rev 9852)
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.api.jms.management;
+
+import org.hornetq.utils.json.JSONArray;
+import org.hornetq.utils.json.JSONException;
+import org.hornetq.utils.json.JSONObject;
+
+/**
+ * A JMSSessionInfo
+ *
+ * @author howard
+ *
+ *
+ */
+public class JMSSessionInfo
+{
+ private final String sessionID;
+
+ private final long creationTime;
+
+ public JMSSessionInfo(String sessionID, long creationTime)
+ {
+ this.sessionID = sessionID;
+ this.creationTime = creationTime;
+ }
+
+ public static JMSSessionInfo[] from(final String jsonString) throws JSONException
+ {
+ JSONArray array = new JSONArray(jsonString);
+ JMSSessionInfo[] infos = new JMSSessionInfo[array.length()];
+ for (int i = 0; i < array.length(); i++)
+ {
+ JSONObject obj = array.getJSONObject(i);
+
+ JMSSessionInfo info = new JMSSessionInfo(obj.getString("sessionID"),
+
obj.getLong("creationTime"));
+ infos[i] = info;
+ }
+ return infos;
+ }
+
+ public String getSessionID()
+ {
+ return sessionID;
+ }
+
+ public long getCreationTime()
+ {
+ return creationTime;
+ }
+}
Modified: trunk/src/main/org/hornetq/api/jms/management/SubscriptionInfo.java
===================================================================
--- trunk/src/main/org/hornetq/api/jms/management/SubscriptionInfo.java 2010-11-06
00:03:12 UTC (rev 9851)
+++ trunk/src/main/org/hornetq/api/jms/management/SubscriptionInfo.java 2010-11-08
14:22:13 UTC (rev 9852)
@@ -40,6 +40,8 @@
private final int messageCount;
+ private final int deliveringCount;
+
// Static --------------------------------------------------------
/**
@@ -58,7 +60,8 @@
sub.optString("name",
null),
sub.getBoolean("durable"),
sub.optString("selector",
null),
-
sub.getInt("messageCount"));
+
sub.getInt("messageCount"),
+
sub.getInt("deliveringCount"));
infos[i] = info;
}
@@ -72,7 +75,8 @@
final String name,
final boolean durable,
final String selector,
- final int messageCount)
+ final int messageCount,
+ final int deliveringCount)
{
this.queueName = queueName;
this.clientID = clientID;
@@ -80,6 +84,7 @@
this.durable = durable;
this.selector = selector;
this.messageCount = messageCount;
+ this.deliveringCount = deliveringCount;
}
// Public --------------------------------------------------------
@@ -131,6 +136,14 @@
{
return messageCount;
}
+
+ /**
+ * Returns the number of messages currently delivered to this subscription.
+ */
+ public int getDeliveringCount()
+ {
+ return deliveringCount;
+ }
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010-11-06 00:03:12
UTC (rev 9851)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010-11-08 14:22:13
UTC (rev 9852)
@@ -45,6 +45,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.ReattachSessionResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.RollbackMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionAddMetaDataMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionBindingQueryMessage;
import
org.hornetq.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionCloseMessage;
@@ -1810,4 +1811,9 @@
}
}
+
+ public void addMetaData(String key, String data) throws HornetQException
+ {
+ channel.sendBlocking(new SessionAddMetaDataMessage(key, data));
+ }
}
Modified: trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2010-11-06 00:03:12
UTC (rev 9851)
+++ trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2010-11-08 14:22:13
UTC (rev 9852)
@@ -555,4 +555,9 @@
{
session.setPacketSize(packetSize);
}
+
+ public void addMetaData(String key, String data) throws HornetQException
+ {
+ session.addMetaData(key, data);
+ }
}
Modified: trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
===================================================================
---
trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2010-11-06
00:03:12 UTC (rev 9851)
+++
trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2010-11-08
14:22:13 UTC (rev 9852)
@@ -1886,4 +1886,13 @@
}
}
+ public String[] listTargetAddresses(String sessionID)
+ {
+ ServerSession session = server.getSessionByID(sessionID);
+ if (session != null) {
+ return session.getTargetAddresses();
+ }
+ return new String[0];
+ }
+
}
Modified: trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java
===================================================================
---
trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2010-11-06
00:03:12 UTC (rev 9851)
+++
trunk/src/main/org/hornetq/core/protocol/core/ServerSessionPacketHandler.java 2010-11-08
14:22:13 UTC (rev 9852)
@@ -62,6 +62,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.NullResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.RollbackMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionAddMetaDataMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionBindingQueryMessage;
import
org.hornetq.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage;
@@ -460,6 +461,13 @@
session.requestProducerCredits(message.getAddress(),
message.getCredits());
break;
}
+ case PacketImpl.SESS_ADD_METADATA:
+ {
+ response = new NullResponseMessage();
+ SessionAddMetaDataMessage message = (SessionAddMetaDataMessage)packet;
+ session.addMetaData(message.getKey(), message.getData());
+ break;
+ }
}
}
catch (HornetQXAException e)
Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2010-11-06
00:03:12 UTC (rev 9851)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/PacketDecoder.java 2010-11-08
14:22:13 UTC (rev 9852)
@@ -78,6 +78,7 @@
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_XA_SET_TIMEOUT_RESP;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_XA_START;
import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_XA_SUSPEND;
+import static org.hornetq.core.protocol.core.impl.PacketImpl.SESS_ADD_METADATA;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.core.logging.Logger;
@@ -107,6 +108,7 @@
import org.hornetq.core.protocol.core.impl.wireformat.ReplicationResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.RollbackMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionAcknowledgeMessage;
+import org.hornetq.core.protocol.core.impl.wireformat.SessionAddMetaDataMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionBindingQueryMessage;
import
org.hornetq.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage;
import org.hornetq.core.protocol.core.impl.wireformat.SessionCloseMessage;
@@ -486,6 +488,11 @@
packet = new SessionForceConsumerDelivery();
break;
}
+ case SESS_ADD_METADATA:
+ {
+ packet = new SessionAddMetaDataMessage();
+ break;
+ }
default:
{
throw new IllegalArgumentException("Invalid type: " + packetType);
Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java 2010-11-06 00:03:12
UTC (rev 9851)
+++ trunk/src/main/org/hornetq/core/protocol/core/impl/PacketImpl.java 2010-11-08 14:22:13
UTC (rev 9852)
@@ -182,6 +182,8 @@
public static final byte REPLICATION_SYNC = 103;
+ public static final byte SESS_ADD_METADATA = 104;
+
// Static --------------------------------------------------------
public PacketImpl(final byte type)
Modified: trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
===================================================================
---
trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2010-11-06
00:03:12 UTC (rev 9851)
+++
trunk/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2010-11-08
14:22:13 UTC (rev 9852)
@@ -89,6 +89,10 @@
private volatile boolean executing;
+ private final long creationTime;
+
+ private String clientID;
+
// Constructors
// ---------------------------------------------------------------------------------
@@ -129,6 +133,8 @@
this.client = client;
this.executor = executor;
+
+ this.creationTime = System.currentTimeMillis();
}
// RemotingConnection implementation
@@ -160,6 +166,11 @@
{
return transportConnection.getRemoteAddress();
}
+
+ public long getCreationTime()
+ {
+ return creationTime;
+ }
public synchronized Channel getChannel(final long channelID, final int
confWindowSize)
{
@@ -547,4 +558,14 @@
channel.close();
}
}
+
+ public void setClientID(String cID)
+ {
+ clientID = cID;
+ }
+
+ public String getClientID()
+ {
+ return clientID;
+ }
}
Copied:
trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAddMetaDataMessage.java
(from rev 9826,
branches/hornetq-416/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAddMetaDataMessage.java)
===================================================================
---
trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAddMetaDataMessage.java
(rev 0)
+++
trunk/src/main/org/hornetq/core/protocol/core/impl/wireformat/SessionAddMetaDataMessage.java 2010-11-08
14:22:13 UTC (rev 9852)
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.protocol.core.impl.wireformat;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.protocol.core.impl.PacketImpl;
+
+/**
+ * A SessionAddMetaDataMessage
+ *
+ * @author <a href="mailto:hgao@redhat.com>Howard Gao</a>
+ *
+ *
+ */
+public class SessionAddMetaDataMessage extends PacketImpl
+{
+ private String key;
+ private String data;
+
+ public SessionAddMetaDataMessage()
+ {
+ super(PacketImpl.SESS_ADD_METADATA);
+ }
+
+ public SessionAddMetaDataMessage(String k, String d)
+ {
+ this();
+ key = k;
+ data = d;
+ }
+
+ @Override
+ public void encodeRest(final HornetQBuffer buffer)
+ {
+ buffer.writeString(key);
+ buffer.writeString(data);
+ }
+
+ @Override
+ public void decodeRest(final HornetQBuffer buffer)
+ {
+ key = buffer.readString();
+ data = buffer.readString();
+ }
+
+ @Override
+ public final boolean isRequiresConfirmations()
+ {
+ return false;
+ }
+
+ public String getKey()
+ {
+ return key;
+ }
+
+ public String getData()
+ {
+ return data;
+ }
+
+}
Modified: trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2010-11-06
00:03:12 UTC (rev 9851)
+++ trunk/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2010-11-08
14:22:13 UTC (rev 9852)
@@ -52,6 +52,8 @@
private boolean valid;
private boolean destroyed = false;
+
+ private final long creationTime;
private StompDecoder decoder = new StompDecoder();
@@ -73,6 +75,8 @@
this.transportConnection = transportConnection;
this.manager = manager;
+
+ this.creationTime = System.currentTimeMillis();
}
public void addFailureListener(final FailureListener listener)
@@ -239,6 +243,11 @@
{
return transportConnection.getRemoteAddress();
}
+
+ public long getCreationTime()
+ {
+ return creationTime;
+ }
public Connection getTransportConnection()
{
Modified: trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
---
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-11-06
00:03:12 UTC (rev 9851)
+++
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2010-11-08
14:22:13 UTC (rev 9852)
@@ -570,4 +570,5 @@
}
}
}
+
}
\ No newline at end of file
Modified: trunk/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/HornetQServer.java 2010-11-06 00:03:12 UTC (rev
9851)
+++ trunk/src/main/org/hornetq/core/server/HornetQServer.java 2010-11-08 14:22:13 UTC (rev
9852)
@@ -159,4 +159,6 @@
void deployBridge(BridgeConfiguration config) throws Exception;
void destroyBridge(String name) throws Exception;
+
+ ServerSession getSessionByID(String sessionID);
}
Modified: trunk/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/Queue.java 2010-11-06 00:03:12 UTC (rev 9851)
+++ trunk/src/main/org/hornetq/core/server/Queue.java 2010-11-08 14:22:13 UTC (rev 9852)
@@ -157,4 +157,6 @@
void close() throws Exception;
boolean isDirectDeliver();
+
+ SimpleString getAddress();
}
Modified: trunk/src/main/org/hornetq/core/server/ServerConsumer.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerConsumer.java 2010-11-06 00:03:12 UTC
(rev 9851)
+++ trunk/src/main/org/hornetq/core/server/ServerConsumer.java 2010-11-08 14:22:13 UTC
(rev 9852)
@@ -47,6 +47,10 @@
void forceDelivery(long sequence);
void setTransferring(boolean transferring);
+
+ boolean isBrowseOnly();
+
+ long getCreationTime();
}
Modified: trunk/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerSession.java 2010-11-06 00:03:12 UTC (rev
9851)
+++ trunk/src/main/org/hornetq/core/server/ServerSession.java 2010-11-08 14:22:13 UTC (rev
9852)
@@ -14,6 +14,7 @@
package org.hornetq.core.server;
import java.util.List;
+import java.util.Set;
import javax.transaction.xa.Xid;
@@ -110,4 +111,16 @@
void close(boolean failed) throws Exception;
void setTransferring(boolean transferring);
+
+ Set<ServerConsumer> getServerConsumers();
+
+ void addMetaData(String key, String data);
+
+ String getMetaData(String key);
+
+ String[] getTargetAddresses();
+
+ String getLastSentMessageID(String address);
+
+ long getCreationTime();
}
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-11-06 00:03:12
UTC (rev 9851)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-11-08 14:22:13
UTC (rev 9852)
@@ -1513,6 +1513,11 @@
});
}
+
+ public ServerSession getSessionByID(String sessionName)
+ {
+ return sessions.get(sessionName);
+ }
// Inner classes
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-11-06 00:03:12 UTC
(rev 9851)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-11-08 14:22:13 UTC
(rev 9852)
@@ -238,6 +238,11 @@
{
return name;
}
+
+ public SimpleString getAddress()
+ {
+ return address;
+ }
public long getID()
{
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2010-11-06
00:03:12 UTC (rev 9851)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2010-11-08
14:22:13 UTC (rev 9852)
@@ -126,6 +126,8 @@
*/
private AtomicBoolean writeReady = new AtomicBoolean(true);
+ private final long creationTime;
+
// Constructors
---------------------------------------------------------------------------------
public ServerConsumerImpl(final long id,
@@ -170,6 +172,8 @@
this.callback.addReadyListener(this);
+ this.creationTime = System.currentTimeMillis();
+
if (browseOnly)
{
browserDeliverer = new BrowserDeliverer(messageQueue.iterator());
@@ -188,6 +192,16 @@
return id;
}
+ public boolean isBrowseOnly()
+ {
+ return browseOnly;
+ }
+
+ public long getCreationTime()
+ {
+ return creationTime;
+ }
+
public HandleStatus handle(final MessageReference ref) throws Exception
{
if (availableCredits != null && availableCredits.get() <= 0)
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-11-06 00:03:12
UTC (rev 9851)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-11-08 14:22:13
UTC (rev 9852)
@@ -16,8 +16,10 @@
import static org.hornetq.api.core.management.NotificationType.CONSUMER_CREATED;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -64,6 +66,7 @@
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.spi.core.protocol.SessionCallback;
import org.hornetq.utils.TypedProperties;
+import org.hornetq.utils.UUID;
/*
* Session implementation
@@ -135,7 +138,13 @@
private volatile SimpleString defaultAddress;
private volatile int timeoutSeconds;
+
+ private Map<String, String> metaData;
+ private Map<SimpleString, UUID> targetAddressInfos = new
HashMap<SimpleString, UUID>();
+
+ private long creationTime = System.currentTimeMillis();
+
// Constructors
---------------------------------------------------------------------------------
public ServerSessionImpl(final String name,
@@ -232,6 +241,11 @@
{
return remotingConnection.getID();
}
+
+ public Set<ServerConsumer> getServerConsumers() {
+ Set<ServerConsumer> consumersClone = new
HashSet<ServerConsumer>(consumers.values());
+ return Collections.unmodifiableSet(consumersClone);
+ }
public void removeConsumer(final long consumerID) throws Exception
{
@@ -1173,8 +1187,58 @@
}
postOffice.route(msg, routingContext, direct);
+
+ targetAddressInfos.put(msg.getAddress(), msg.getUserID());
routingContext.clear();
}
+ public void addMetaData(String key, String data)
+ {
+ if (metaData == null)
+ {
+ metaData = new HashMap<String, String>();
+ }
+ metaData.put(key, data);
+ }
+
+ public String getMetaData(String key)
+ {
+ String data = null;
+ if (metaData != null)
+ {
+ data = metaData.get(key);
+ }
+ return data;
+ }
+
+ public String[] getTargetAddresses()
+ {
+ Map<SimpleString, UUID> copy = new HashMap<SimpleString,
UUID>(targetAddressInfos);
+ Iterator<SimpleString> iter = copy.keySet().iterator();
+ int num = copy.keySet().size();
+ String[] addresses = new String[num];
+ int i = 0;
+ while (iter.hasNext())
+ {
+ addresses[i] = iter.next().toString();
+ i++;
+ }
+ return addresses;
+ }
+
+ public String getLastSentMessageID(String address)
+ {
+ UUID id = targetAddressInfos.get(SimpleString.toSimpleString(address));
+ if (id != null)
+ {
+ return id.toString();
+ }
+ return null;
+ }
+
+ public long getCreationTime()
+ {
+ return this.creationTime;
+ }
}
Modified: trunk/src/main/org/hornetq/jms/client/HornetQConnection.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQConnection.java 2010-11-06 00:03:12 UTC
(rev 9851)
+++ trunk/src/main/org/hornetq/jms/client/HornetQConnection.java 2010-11-08 14:22:13 UTC
(rev 9852)
@@ -181,6 +181,16 @@
}
this.clientID = clientID;
+ try
+ {
+ this.addSessionMetaData(initialSession);
+ }
+ catch (HornetQException e)
+ {
+ JMSException ex = new JMSException("Internal error setting metadata
jms-client-id");
+ ex.setLinkedException(e);
+ throw ex;
+ }
justCreated = false;
}
@@ -537,6 +547,8 @@
{
session.start();
}
+
+ this.addSessionMetaData(session);
return jbs;
}
@@ -562,6 +574,8 @@
{
initialSession = sessionFactory.createSession(username, password, false, false,
false, false, 0);
+ addSessionMetaData(initialSession);
+
initialSession.addFailureListener(listener);
}
catch (HornetQException me)
@@ -570,6 +584,15 @@
}
}
+ private void addSessionMetaData(ClientSession session) throws HornetQException
+ {
+ session.addMetaData("jms-session", "");
+ if (clientID != null)
+ {
+ session.addMetaData("jms-client-id", clientID);
+ }
+ }
+
// Inner classes
--------------------------------------------------------------------------------
private static class JMSFailureListener implements SessionFailureListener
Modified: trunk/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java 2010-11-06
00:03:12 UTC (rev 9851)
+++ trunk/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java 2010-11-08
14:22:13 UTC (rev 9852)
@@ -14,8 +14,10 @@
package org.hornetq.jms.management.impl;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import javax.management.ListenerNotFoundException;
@@ -32,11 +34,20 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.jms.management.ConnectionFactoryControl;
+import org.hornetq.api.jms.management.DestinationControl;
import org.hornetq.api.jms.management.JMSQueueControl;
import org.hornetq.api.jms.management.JMSServerControl;
import org.hornetq.api.jms.management.TopicControl;
+import org.hornetq.core.filter.Filter;
import org.hornetq.core.management.impl.MBeanInfoHelper;
+import org.hornetq.core.server.ServerConsumer;
+import org.hornetq.core.server.ServerSession;
+import org.hornetq.jms.client.HornetQDestination;
+import org.hornetq.jms.client.HornetQQueue;
import org.hornetq.jms.server.JMSServerManager;
+import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.utils.json.JSONArray;
+import org.hornetq.utils.json.JSONObject;
import org.hornetq.jms.server.impl.JMSFactoryType;
/**
@@ -86,6 +97,38 @@
}
return trimmed;
}
+
+ private static String[] determineJMSDestination(String coreAddress)
+ {
+ String[] result = new String[2]; // destination name & type
+ if (coreAddress.startsWith(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX))
+ {
+ result[0] =
coreAddress.substring(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX.length());
+ result[1] = "queue";
+ }
+ else if (coreAddress.startsWith(HornetQQueue.JMS_TEMP_QUEUE_ADDRESS_PREFIX))
+ {
+ result[0] =
coreAddress.substring(HornetQQueue.JMS_TEMP_QUEUE_ADDRESS_PREFIX.length());
+ result[1] = "tempqueue";
+ }
+ else if (coreAddress.startsWith(HornetQQueue.JMS_TOPIC_ADDRESS_PREFIX))
+ {
+ result[0] =
coreAddress.substring(HornetQQueue.JMS_TOPIC_ADDRESS_PREFIX.length());
+ result[1] = "topic";
+ }
+ else if (coreAddress.startsWith(HornetQQueue.JMS_TEMP_TOPIC_ADDRESS_PREFIX))
+ {
+ result[0] =
coreAddress.substring(HornetQQueue.JMS_TEMP_TOPIC_ADDRESS_PREFIX.length());
+ result[1] = "temptopic";
+ }
+ else
+ {
+ System.out.println("JMSServerControlImpl.determineJMSDestination()" +
coreAddress);
+ // not related to JMS
+ return null;
+ }
+ return result;
+ }
private static List<Pair<TransportConfiguration, TransportConfiguration>>
convertToConnectorPairs(final Object[] liveConnectorsTransportClassNames,
final Object[] liveConnectorTransportParams,
@@ -682,7 +725,116 @@
blockOnIO();
}
}
+
+ public String listConnectionsAsJSON() throws Exception
+ {
+ checkStarted();
+ clearIO();
+
+ try
+ {
+ JSONArray array = new JSONArray();
+
+ Set<RemotingConnection> connections =
server.getHornetQServer().getRemotingService().getConnections();
+
+ Set<ServerSession> sessions = server.getHornetQServer().getSessions();
+
+ Map<Object, ServerSession> jmsSessions = new HashMap<Object,
ServerSession>();
+
+ for (ServerSession session : sessions)
+ {
+ if (session.getMetaData("jms-session") != null)
+ {
+ jmsSessions.put(session.getConnectionID(), session);
+ }
+ }
+
+ for (RemotingConnection connection : connections)
+ {
+ JSONObject obj = new JSONObject();
+ obj.put("connectionID", connection.getID().toString());
+ obj.put("clientAddress", connection.getRemoteAddress());
+ obj.put("creationTime", connection.getCreationTime());
+ obj.put("clientID",
jmsSessions.get(connection.getID()).getMetaData("jms-client-id"));
+ obj.put("principal",
jmsSessions.get(connection.getID()).getUsername());
+ array.put(obj);
+ }
+ return array.toString();
+ }
+ finally
+ {
+ blockOnIO();
+ }
+ }
+
+ public String listConsumersAsJSON(String connectionID) throws Exception
+ {
+ checkStarted();
+
+ clearIO();
+
+ try
+ {
+ JSONArray array = new JSONArray();
+
+ Set<RemotingConnection> connections =
server.getHornetQServer().getRemotingService().getConnections();
+ for (RemotingConnection connection : connections)
+ {
+ if (connectionID.equals(connection.getID().toString()))
+ {
+ List<ServerSession> sessions =
server.getHornetQServer().getSessions(connectionID);
+ for (ServerSession session : sessions)
+ {
+ Set<ServerConsumer> consumers = session.getServerConsumers();
+ for (ServerConsumer consumer : consumers)
+ {
+ JSONObject obj = new JSONObject();
+ obj.put("consumerID", consumer.getID());
+ obj.put("connectionID", connectionID);
+ obj.put("queueName",
consumer.getQueue().getName().toString());
+ obj.put("browseOnly", consumer.isBrowseOnly());
+ obj.put("creationTime", consumer.getCreationTime());
+ // JMS consumer with message filter use the queue's filter
+ Filter queueFilter = consumer.getQueue().getFilter();
+ if (queueFilter != null)
+ {
+ obj.put("filter",
queueFilter.getFilterString().toString());
+ }
+ String[] destinationInfo =
determineJMSDestination(consumer.getQueue().getAddress().toString());
+ if (destinationInfo == null)
+ {
+ continue;
+ }
+ obj.put("destinationName", destinationInfo[0]);
+ obj.put("destinationType", destinationInfo[1]);
+ if (destinationInfo[1].equals("topic")) {
+ try
+ {
+
HornetQDestination.decomposeQueueNameForDurableSubscription(consumer.getQueue().getName().toString());
+ obj.put("durable", true);
+ } catch (IllegalArgumentException e)
+ {
+ obj.put("durable", false);
+ }
+ }
+ else
+ {
+ obj.put("durable", false);
+ }
+ array.put(obj);
+ }
+ }
+ }
+ }
+ return array.toString();
+ }
+ finally
+ {
+ blockOnIO();
+ }
+ }
+
public String[] listSessions(final String connectionID) throws Exception
{
checkStarted();
@@ -816,4 +968,81 @@
}
return list;
}
+
+ public String[] listTargetDestinations(String sessionID) throws Exception
+ {
+ String[] addresses =
server.getHornetQServer().getHornetQServerControl().listTargetAddresses(sessionID);
+ Map<String, DestinationControl> allDests = new HashMap<String,
DestinationControl>();
+
+ Object[] queueControls =
server.getHornetQServer().getManagementService().getResources(JMSQueueControl.class);
+ for (int i = 0; i < queueControls.length; i++)
+ {
+ JMSQueueControl queueControl = (JMSQueueControl)queueControls[i];
+ allDests.put(queueControl.getAddress(), queueControl);
+ }
+
+ Object[] topicControls =
server.getHornetQServer().getManagementService().getResources(TopicControl.class);
+ for (int i = 0; i < topicControls.length; i++)
+ {
+ TopicControl topicControl = (TopicControl)topicControls[i];
+ allDests.put(topicControl.getAddress(), topicControl);
+ }
+
+ List<String> destinations = new ArrayList<String>();
+ for (int i = 0; i < addresses.length; i++)
+ {
+ DestinationControl control = allDests.get(addresses[i]);
+ if (control != null)
+ {
+ destinations.add(control.getAddress());
+ }
+ }
+ return destinations.toArray(new String[0]);
+ }
+
+ public String getLastSentMessageID(String sessionID, String address) throws Exception
+ {
+ ServerSession session = server.getHornetQServer().getSessionByID(sessionID);
+ if (session != null)
+ {
+ return session.getLastSentMessageID(address);
+ }
+ return null;
+ }
+
+ public String getSessionCreationTime(String sessionID) throws Exception
+ {
+ ServerSession session = server.getHornetQServer().getSessionByID(sessionID);
+ if (session != null)
+ {
+ return String.valueOf(session.getCreationTime());
+ }
+ return null;
+ }
+
+ public String listSessionsAsJSON(final String connectionID) throws Exception
+ {
+ checkStarted();
+
+ clearIO();
+
+ JSONArray array = new JSONArray();
+ try
+ {
+ List<ServerSession> sessions =
server.getHornetQServer().getSessions(connectionID);
+ for (ServerSession sess : sessions)
+ {
+ JSONObject obj = new JSONObject();
+ obj.put("sessionID", sess.getName());
+ obj.put("creationTime", sess.getCreationTime());
+ array.put(obj);
+ }
+ }
+ finally
+ {
+ blockOnIO();
+ }
+ return array.toString();
+ }
+
}
Modified: trunk/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java 2010-11-06
00:03:12 UTC (rev 9851)
+++ trunk/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java 2010-11-08
14:22:13 UTC (rev 9852)
@@ -118,6 +118,28 @@
return getMessageCount(DurabilityType.ALL);
}
+ public int getDeliveringCount()
+ {
+ List<QueueControl> queues = getQueues(DurabilityType.ALL);
+ int count = 0;
+ for (QueueControl queue : queues)
+ {
+ count += queue.getDeliveringCount();
+ }
+ return count;
+ }
+
+ public long getMessagesAdded()
+ {
+ List<QueueControl> queues = getQueues(DurabilityType.ALL);
+ int count = 0;
+ for (QueueControl queue : queues)
+ {
+ count += queue.getMessagesAdded();
+ }
+ return count;
+ }
+
public int getDurableMessageCount()
{
return getMessageCount(DurabilityType.DURABLE);
@@ -315,6 +337,7 @@
info.put("name", subName);
info.put("durable", queue.isDurable());
info.put("messageCount", queue.getMessageCount());
+ info.put("deliveringCount", queue.getDeliveringCount());
array.put(info);
}
Modified: trunk/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java
===================================================================
--- trunk/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java 2010-11-06
00:03:12 UTC (rev 9851)
+++ trunk/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java 2010-11-08
14:22:13 UTC (rev 9852)
@@ -13,6 +13,7 @@
package org.hornetq.spi.core.protocol;
+import java.util.Collection;
import java.util.List;
import org.hornetq.api.core.HornetQBuffer;
@@ -38,6 +39,11 @@
Object getID();
/**
+ * Returns the creation time of the Remoting connection
+ */
+ long getCreationTime();
+
+ /**
* returns a string representation of the remote address of this connection
*
* @return the remote address
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java
===================================================================
---
trunk/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java 2010-11-06
00:03:12 UTC (rev 9851)
+++
trunk/tests/jms-tests/src/org/hornetq/jms/tests/message/MessageHeaderTest.java 2010-11-08
14:22:13 UTC (rev 9852)
@@ -1390,6 +1390,24 @@
// TODO Auto-generated method stub
}
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.client.ClientSession#setClientID(java.lang.String)
+ */
+ public void setClientID(String clientID)
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.api.core.client.ClientSession#addMetaData(java.lang.String,
java.lang.String)
+ */
+ public void addMetaData(String key, String data) throws HornetQException
+ {
+ // TODO Auto-generated method stub
+
+ }
}
}
Modified:
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java 2010-11-06
00:03:12 UTC (rev 9851)
+++
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java 2010-11-08
14:22:13 UTC (rev 9852)
@@ -21,11 +21,23 @@
import javax.jms.ConnectionFactory;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
import junit.framework.Assert;
import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.jms.HornetQJMSClient;
+import org.hornetq.api.jms.management.JMSConnectionInfo;
+import org.hornetq.api.jms.management.JMSConsumerInfo;
import org.hornetq.api.jms.management.JMSServerControl;
+import org.hornetq.api.jms.management.JMSSessionInfo;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
@@ -35,6 +47,7 @@
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
+import org.hornetq.jms.client.HornetQQueue;
import org.hornetq.jms.server.impl.JMSServerManagerImpl;
import org.hornetq.tests.integration.management.ManagementControlHelper;
import org.hornetq.tests.integration.management.ManagementTestBase;
@@ -139,7 +152,300 @@
{
doListConnectionIDs(NettyAcceptorFactory.class.getName(),
NettyConnectorFactory.class.getName());
}
+
+ public void testListConnectionsAsJSONForNetty() throws Exception
+ {
+ doListConnectionsAsJSON(NettyAcceptorFactory.class.getName(),
NettyConnectorFactory.class.getName());
+ }
+ public void testListConnectionsAsJSONForInVM() throws Exception
+ {
+ doListConnectionsAsJSON(InVMAcceptorFactory.class.getName(),
InVMConnectorFactory.class.getName());
+ }
+
+ public void testListConsumersAsJSON() throws Exception
+ {
+ String queueName = RandomUtil.randomString();
+
+ try
+ {
+ startHornetQServer(NettyAcceptorFactory.class.getName());
+ serverManager.createQueue(false, queueName, null, true, queueName);
+ Queue queue = HornetQJMSClient.createQueue(queueName);
+
+ JMSServerControl control = createManagementControl();
+
+ long startTime = System.currentTimeMillis();
+
+ String jsonStr = control.listConnectionsAsJSON();
+ assertNotNull(jsonStr);
+ JMSConnectionInfo[] infos = JMSConnectionInfo.from(jsonStr);
+ assertEquals(0, infos.length);
+
+ ConnectionFactory cf1 =
JMSUtil.createFactory(NettyConnectorFactory.class.getName(),
+
JMSServerControl2Test.CONNECTION_TTL,
+
JMSServerControl2Test.PING_PERIOD);
+ Connection connection = cf1.createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ TemporaryTopic temporaryTopic = session.createTemporaryTopic();
+
+ // create a regular message consumer
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ jsonStr = control.listConnectionsAsJSON();
+ assertNotNull(jsonStr);
+ infos = JMSConnectionInfo.from(jsonStr);
+ assertEquals(1, infos.length);
+ String connectionID = infos[0].getConnectionID();
+
+ String consJsonStr = control.listConsumersAsJSON(connectionID);
+ assertNotNull(consJsonStr);
+ JMSConsumerInfo[] consumerInfos = JMSConsumerInfo.from(consJsonStr);
+ assertEquals(1, consumerInfos.length);
+ JMSConsumerInfo consumerInfo = consumerInfos[0];
+ assertNotNull(consumerInfo.getConsumerID());
+ assertEquals(connectionID, consumerInfo.getConnectionID());
+ assertEquals(queue.getQueueName(), consumerInfo.getDestinationName());
+ assertEquals("queue", consumerInfo.getDestinationType());
+ assertNull(consumerInfo.getFilter());
+ assertEquals(false, consumerInfo.isBrowseOnly());
+ assertEquals(false, consumerInfo.isDurable());
+ assertTrue(startTime <= consumerInfo.getCreationTime() &&
consumerInfo.getCreationTime() <= System.currentTimeMillis());
+
+ consumer.close();
+
+ consJsonStr = control.listConsumersAsJSON(connectionID);
+ assertNotNull(consJsonStr);
+ consumerInfos = JMSConsumerInfo.from(consJsonStr);
+ assertEquals(0, consumerInfos.length);
+
+ // create a queue browser
+ QueueBrowser browser = session.createBrowser(queue);
+ // the server resources are created when the browser starts enumerating
+ browser.getEnumeration();
+
+ consJsonStr = control.listConsumersAsJSON(connectionID);
+ assertNotNull(consJsonStr);
+ System.out.println(consJsonStr);
+ consumerInfos = JMSConsumerInfo.from(consJsonStr);
+ assertEquals(1, consumerInfos.length);
+ consumerInfo = consumerInfos[0];
+ assertNotNull(consumerInfo.getConsumerID());
+ assertEquals(connectionID, consumerInfo.getConnectionID());
+ assertEquals(queue.getQueueName(), consumerInfo.getDestinationName());
+ assertEquals("queue", consumerInfo.getDestinationType());
+ assertNull(consumerInfo.getFilter());
+ assertEquals(true, consumerInfo.isBrowseOnly());
+ assertEquals(false, consumerInfo.isDurable());
+ assertTrue(startTime <= consumerInfo.getCreationTime() &&
consumerInfo.getCreationTime() <= System.currentTimeMillis());
+
+ browser.close();
+
+ // create a regular consumer w/ filter on a temp topic
+ String filter = "color = 'red'";
+ consumer = session.createConsumer(temporaryTopic, filter);
+
+ consJsonStr = control.listConsumersAsJSON(connectionID);
+ assertNotNull(consJsonStr);
+ System.out.println(consJsonStr);
+ consumerInfos = JMSConsumerInfo.from(consJsonStr);
+ assertEquals(1, consumerInfos.length);
+ consumerInfo = consumerInfos[0];
+ assertNotNull(consumerInfo.getConsumerID());
+ assertEquals(connectionID, consumerInfo.getConnectionID());
+ assertEquals(temporaryTopic.getTopicName(), consumerInfo.getDestinationName());
+ assertEquals("temptopic", consumerInfo.getDestinationType());
+ assertEquals(filter, consumerInfo.getFilter());
+ assertEquals(false, consumerInfo.isBrowseOnly());
+ assertEquals(false, consumerInfo.isDurable());
+ assertTrue(startTime <= consumerInfo.getCreationTime() &&
consumerInfo.getCreationTime() <= System.currentTimeMillis());
+
+ consumer.close();
+
+ connection.close();
+ }
+ finally
+ {
+ if (serverManager != null)
+ {
+ serverManager.destroyQueue(queueName);
+ serverManager.stop();
+ }
+
+ if (server != null)
+ {
+ server.stop();
+ }
+ }
+ }
+
+ /**
+ * test for durable subscriber
+ */
+ public void testListConsumersAsJSON2() throws Exception
+ {
+ String topicName = RandomUtil.randomString();
+ String clientID = RandomUtil.randomString();
+ String subName = RandomUtil.randomString();
+
+ try
+ {
+ startHornetQServer(NettyAcceptorFactory.class.getName());
+ serverManager.createTopic(false, topicName, topicName);
+ Topic topic = HornetQJMSClient.createTopic(topicName);
+
+ JMSServerControl control = createManagementControl();
+
+ long startTime = System.currentTimeMillis();
+
+ String jsonStr = control.listConnectionsAsJSON();
+ assertNotNull(jsonStr);
+ JMSConnectionInfo[] infos = JMSConnectionInfo.from(jsonStr);
+ assertEquals(0, infos.length);
+
+ ConnectionFactory cf1 =
JMSUtil.createFactory(NettyConnectorFactory.class.getName(),
+
JMSServerControl2Test.CONNECTION_TTL,
+
JMSServerControl2Test.PING_PERIOD);
+ Connection connection = cf1.createConnection();
+ connection.setClientID(clientID);
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // create a durable subscriber
+ MessageConsumer consumer = session.createDurableSubscriber(topic, subName);
+
+ jsonStr = control.listConnectionsAsJSON();
+ assertNotNull(jsonStr);
+ infos = JMSConnectionInfo.from(jsonStr);
+ assertEquals(1, infos.length);
+ String connectionID = infos[0].getConnectionID();
+
+ String consJsonStr = control.listConsumersAsJSON(connectionID);
+ assertNotNull(consJsonStr);
+ JMSConsumerInfo[] consumerInfos = JMSConsumerInfo.from(consJsonStr);
+ assertEquals(1, consumerInfos.length);
+ JMSConsumerInfo consumerInfo = consumerInfos[0];
+ assertNotNull(consumerInfo.getConsumerID());
+ assertEquals(connectionID, consumerInfo.getConnectionID());
+ assertEquals(topic.getTopicName(), consumerInfo.getDestinationName());
+ assertEquals("topic", consumerInfo.getDestinationType());
+ assertNull(consumerInfo.getFilter());
+ assertEquals(false, consumerInfo.isBrowseOnly());
+ assertEquals(true, consumerInfo.isDurable());
+ assertTrue(startTime <= consumerInfo.getCreationTime() &&
consumerInfo.getCreationTime() <= System.currentTimeMillis());
+
+ consumer.close();
+
+ connection.close();
+ }
+ finally
+ {
+ if (serverManager != null)
+ {
+ serverManager.destroyTopic(topicName);
+ serverManager.stop();
+ }
+
+ if (server != null)
+ {
+ server.stop();
+ }
+ }
+ }
+
+ //https://jira.jboss.org/browse/HORNETQ-416
+ public void testProducerInfo() throws Exception
+ {
+ String queueName = RandomUtil.randomString();
+
+ System.out.println("queueName is: " + queueName);
+
+ try
+ {
+ startHornetQServer(NettyAcceptorFactory.class.getName());
+ serverManager.createQueue(false, queueName, null, true, queueName);
+ Queue queue = HornetQJMSClient.createQueue(queueName);
+
+ JMSServerControl control = createManagementControl();
+
+ long startTime = System.currentTimeMillis();
+
+ ConnectionFactory cf1 =
JMSUtil.createFactory(NettyConnectorFactory.class.getName(),
+
JMSServerControl2Test.CONNECTION_TTL,
+
JMSServerControl2Test.PING_PERIOD);
+ Connection connection = cf1.createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer producer = session.createProducer(queue);
+
+ for (int i = 0; i < 10; i++)
+ {
+ TextMessage msg = session.createTextMessage("mymessage-" + i);
+ producer.send(msg);
+ }
+
+ connection.start();
+
+ // create a regular message consumer
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ TextMessage receivedMsg = null;
+ for (int i = 0; i < 10; i++)
+ {
+ receivedMsg = (TextMessage)consumer.receive(3000);
+ System.out.println("receiveMsg: " + receivedMsg);
+ }
+
+ String lastMsgID = receivedMsg.getJMSMessageID();
+ System.out.println("Last mid: " + lastMsgID);
+
+ String jsonStr = control.listConnectionsAsJSON();
+ JMSConnectionInfo[] infos = JMSConnectionInfo.from(jsonStr);
+
+ JMSConnectionInfo connInfo = infos[0];
+
+ String sessionsStr = control.listSessionsAsJSON(connInfo.getConnectionID());
+ JMSSessionInfo[] sessInfos = JMSSessionInfo.from(sessionsStr);
+
+ assertTrue(sessInfos.length > 0);
+ boolean lastMsgFound = false;
+ for (JMSSessionInfo sInfo : sessInfos)
+ {
+ System.out.println("Session name: " + sInfo.getSessionID());
+ assertNotNull(sInfo.getSessionID());
+ long createTime = sInfo.getCreationTime();
+ assertTrue(startTime <= createTime && createTime <=
System.currentTimeMillis());
+ String lastID = control.getLastSentMessageID(sInfo.getSessionID(),
"jms.queue." + queueName);
+ if (lastID != null)
+ {
+ assertEquals(lastMsgID, lastID);
+ lastMsgFound = true;
+ }
+ }
+ assertTrue(lastMsgFound);
+
+ consumer.close();
+
+ connection.close();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ throw e;
+ }
+ finally
+ {
+ if (serverManager != null)
+ {
+ serverManager.destroyQueue(queueName);
+ serverManager.stop();
+ }
+
+ if (server != null)
+ {
+ server.stop();
+ }
+ }
+ }
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -207,6 +513,95 @@
}
}
+ private void doListConnectionsAsJSON(final String acceptorFactory, final String
connectorFactory) throws Exception
+ {
+ try
+ {
+ startHornetQServer(acceptorFactory);
+
+ JMSServerControl control = createManagementControl();
+
+ long startTime = System.currentTimeMillis();
+
+ String jsonStr = control.listConnectionsAsJSON();
+ assertNotNull(jsonStr);
+ JMSConnectionInfo[] infos = JMSConnectionInfo.from(jsonStr);
+ assertEquals(0, infos.length);
+
+ ConnectionFactory cf1 = JMSUtil.createFactory(connectorFactory,
+
JMSServerControl2Test.CONNECTION_TTL,
+
JMSServerControl2Test.PING_PERIOD);
+ Connection connection = cf1.createConnection();
+
+ jsonStr = control.listConnectionsAsJSON();
+ assertNotNull(jsonStr);
+ infos = JMSConnectionInfo.from(jsonStr);
+ assertEquals(1, infos.length);
+ for (JMSConnectionInfo info : infos)
+ {
+ assertNotNull(info.getConnectionID());
+ assertNotNull(info.getClientAddress());
+ assertTrue(startTime <= info.getCreationTime() &&
info.getCreationTime() <= System.currentTimeMillis());
+ }
+
+ ConnectionFactory cf2 = JMSUtil.createFactory(connectorFactory,
+
JMSServerControl2Test.CONNECTION_TTL,
+
JMSServerControl2Test.PING_PERIOD);
+ Connection connection2 = cf2.createConnection();
+
+ jsonStr = control.listConnectionsAsJSON();
+ assertNotNull(jsonStr);
+ infos = JMSConnectionInfo.from(jsonStr);
+ assertEquals(2, infos.length);
+ for (JMSConnectionInfo info : infos)
+ {
+ assertNotNull(info.getConnectionID());
+ assertNotNull(info.getClientAddress());
+ assertTrue(startTime <= info.getCreationTime() &&
info.getCreationTime() <= System.currentTimeMillis());
+ assertNull(info.getClientID());
+ assertNull(info.getUsername());
+ }
+
+ connection.close();
+
+ waitForConnectionIDs(1, control);
+
+ connection2.close();
+
+ waitForConnectionIDs(0, control);
+
+ Connection connection3 = cf2.createConnection("guest",
"guest");
+ connection3.setClientID("MyClient");
+
+ jsonStr = control.listConnectionsAsJSON();
+ assertNotNull(jsonStr);
+
+ infos = JMSConnectionInfo.from(jsonStr);
+ JMSConnectionInfo info = infos[0];
+ assertEquals("MyClient", info.getClientID());
+ assertEquals("guest", info.getUsername());
+
+ connection3.close();
+
+ jsonStr = control.listConnectionsAsJSON();
+ assertNotNull(jsonStr);
+ infos = JMSConnectionInfo.from(jsonStr);
+ assertEquals(0, infos.length);
+ }
+ finally
+ {
+ if (serverManager != null)
+ {
+ serverManager.stop();
+ }
+
+ if (server != null)
+ {
+ server.stop();
+ }
+ }
+ }
+
private void waitForConnectionIDs(final int num, final JMSServerControl control)
throws Exception
{
final long timeout = 10000;
Modified:
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java 2010-11-06
00:03:12 UTC (rev 9851)
+++
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java 2010-11-08
14:22:13 UTC (rev 9852)
@@ -21,14 +21,12 @@
import javax.jms.Session;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.management.Parameter;
import org.hornetq.api.core.management.ResourceNames;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.api.jms.management.JMSServerControl;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.security.Role;
import org.hornetq.jms.client.HornetQConnectionFactory;
-import org.hornetq.jms.client.HornetQDestination;
import org.hornetq.jms.client.HornetQQueue;
/**
@@ -233,6 +231,16 @@
{
return (String[])proxy.invokeOperation("listConnectionIDs");
}
+
+ public String listConnectionsAsJSON() throws Exception
+ {
+ return (String)proxy.invokeOperation("listConnectionsAsJSON");
+ }
+
+ public String listConsumersAsJSON(String connectionID) throws Exception
+ {
+ return (String)proxy.invokeOperation("listConsumersAsJSON",
connectionID);
+ }
public String[] listRemoteAddresses() throws Exception
{
@@ -275,6 +283,27 @@
return (Boolean)proxy.invokeOperation("createTopic", name,
jndiBinding);
}
+ public String[] listTargetDestinations(String sessionID) throws Exception
+ {
+ return null;
+ }
+
+ public String getLastSentMessageID(String sessionID, String address) throws
Exception
+ {
+ return null;
+ }
+
+ public String getSessionCreationTime(String sessionID) throws Exception
+ {
+ return null;
+ }
+
+ public String listSessionsAsJSON(String connectionID) throws Exception
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
public String listPreparedTransactionDetailsAsJSON() throws Exception
{
return
(String)proxy.invokeOperation("listPreparedTransactionDetailsAsJSON");
Modified:
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java 2010-11-06
00:03:12 UTC (rev 9851)
+++
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java 2010-11-08
14:22:13 UTC (rev 9852)
@@ -78,26 +78,36 @@
}
static MessageConsumer createConsumer(final Connection connection,
+ final Destination destination) throws
JMSException
+ {
+ return createConsumer(connection, destination, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ static MessageConsumer createConsumer(final Connection connection,
final Destination destination,
- final String connectorFactory) throws
JMSException
+ int ackMode) throws JMSException
{
- Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session s = connection.createSession(false, ackMode);
return s.createConsumer(destination);
}
- public static MessageConsumer createConsumer(final Connection connection, final
Destination destination) throws JMSException
+ static TopicSubscriber createDurableSubscriber(final Connection connection,
+ final Topic topic,
+ final String clientID,
+ final String subscriptionName) throws
JMSException
{
- return JMSUtil.createConsumer(connection, destination,
InVMConnectorFactory.class.getName());
+ return createDurableSubscriber(connection, topic, clientID, subscriptionName,
Session.AUTO_ACKNOWLEDGE);
}
-
+
static TopicSubscriber createDurableSubscriber(final Connection connection,
final Topic topic,
final String clientID,
- final String subscriptionName) throws
JMSException
+ final String subscriptionName,
+ final int ackMode) throws JMSException
{
connection.setClientID(clientID);
- Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session s = connection.createSession(false, ackMode);
return s.createDurableSubscriber(topic, subscriptionName);
}
Modified:
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java 2010-11-06
00:03:12 UTC (rev 9851)
+++
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java 2010-11-08
14:22:13 UTC (rev 9852)
@@ -13,15 +13,22 @@
package org.hornetq.tests.integration.jms.server.management;
+import static junit.framework.Assert.assertEquals;
+
import java.util.Map;
import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TopicSubscriber;
import junit.framework.Assert;
+import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.management.QueueControl;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.api.jms.management.SubscriptionInfo;
import org.hornetq.api.jms.management.TopicControl;
@@ -406,7 +413,77 @@
{
}
}
+
+ public void testGetMessagesAdded() throws Exception
+ {
+ Connection connection_1 =
JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+ JMSUtil.createConsumer(connection_1, topic);
+ Connection connection_2 =
JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+ JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName);
+ Connection connection_3 =
JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+ JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName +
"2");
+ TopicControl topicControl = createManagementControl();
+
+ Assert.assertEquals(0, topicControl.getMessagesAdded());
+
+ JMSUtil.sendMessages(topic, 2);
+
+ Assert.assertEquals(3 * 2, topicControl.getMessagesAdded());
+
+ connection_1.close();
+ connection_2.close();
+ connection_3.close();
+ }
+
+ public void testGetMessagesDelivering() throws Exception
+ {
+ Connection connection_1 =
JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+ MessageConsumer cons_1 = JMSUtil.createConsumer(connection_1, topic,
Session.CLIENT_ACKNOWLEDGE);
+ Connection connection_2 =
JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+ MessageConsumer cons_2 = JMSUtil.createDurableSubscriber(connection_2, topic,
clientID, subscriptionName, Session.CLIENT_ACKNOWLEDGE);
+ Connection connection_3 =
JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+ MessageConsumer cons_3 = JMSUtil.createDurableSubscriber(connection_3, topic,
clientID, subscriptionName + "2", Session.CLIENT_ACKNOWLEDGE);
+
+ TopicControl topicControl = createManagementControl();
+
+ assertEquals(0, topicControl.getDeliveringCount());
+
+ JMSUtil.sendMessages(topic, 2);
+
+ assertEquals(0, topicControl.getDeliveringCount());
+
+ connection_1.start();
+ connection_2.start();
+ connection_3.start();
+
+ Message msg_1 = null;
+ Message msg_2 = null;
+ Message msg_3 = null;
+ for (int i = 0; i < 2; i++)
+ {
+ msg_1 = cons_1.receive(5000);
+ assertNotNull(msg_1);
+ msg_2 = cons_2.receive(5000);
+ assertNotNull(msg_2);
+ msg_3 = cons_3.receive(5000);
+ assertNotNull(msg_3);
+ }
+
+ assertEquals(3 * 2, topicControl.getDeliveringCount());
+
+ msg_1.acknowledge();
+ assertEquals(2 * 2, topicControl.getDeliveringCount());
+ msg_2.acknowledge();
+ assertEquals(1 * 2, topicControl.getDeliveringCount());
+ msg_3.acknowledge();
+ assertEquals(0, topicControl.getDeliveringCount());
+
+ connection_1.close();
+ connection_2.close();
+ connection_3.close();
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified:
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlUsingJMSTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlUsingJMSTest.java 2010-11-06
00:03:12 UTC (rev 9851)
+++
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlUsingJMSTest.java 2010-11-08
14:22:13 UTC (rev 9852)
@@ -13,9 +13,12 @@
package org.hornetq.tests.integration.jms.server.management;
+import static junit.framework.Assert.assertEquals;
import static org.hornetq.tests.util.RandomUtil.randomString;
import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import javax.jms.Session;
@@ -26,6 +29,7 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.management.ResourceNames;
import org.hornetq.api.jms.HornetQJMSClient;
+import org.hornetq.api.jms.management.TopicControl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
@@ -333,6 +337,71 @@
}
}
+ public void testGetMessagesAdded() throws Exception
+ {
+ Connection connection_1 =
JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+ JMSUtil.createConsumer(connection_1, topic);
+ Connection connection_2 =
JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+ JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName);
+ Connection connection_3 =
JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+ JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName +
"2");
+
+ assertEquals(0, proxy.retrieveAttributeValue("messagesAdded"));
+
+ JMSUtil.sendMessages(topic, 2);
+
+ assertEquals(3 * 2, proxy.retrieveAttributeValue("messagesAdded"));
+
+ connection_1.close();
+ connection_2.close();
+ connection_3.close();
+ }
+
+ public void testGetMessagesDelivering() throws Exception
+ {
+ Connection connection_1 =
JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+ MessageConsumer cons_1 = JMSUtil.createConsumer(connection_1, topic,
Session.CLIENT_ACKNOWLEDGE);
+ Connection connection_2 =
JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+ MessageConsumer cons_2 = JMSUtil.createDurableSubscriber(connection_2, topic,
clientID, subscriptionName, Session.CLIENT_ACKNOWLEDGE);
+ Connection connection_3 =
JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+ MessageConsumer cons_3 = JMSUtil.createDurableSubscriber(connection_3, topic,
clientID, subscriptionName + "2", Session.CLIENT_ACKNOWLEDGE);
+
+ assertEquals(0, proxy.retrieveAttributeValue("deliveringCount"));
+
+ JMSUtil.sendMessages(topic, 2);
+
+ assertEquals(0, proxy.retrieveAttributeValue("deliveringCount"));
+
+ connection_1.start();
+ connection_2.start();
+ connection_3.start();
+
+ Message msg_1 = null;
+ Message msg_2 = null;
+ Message msg_3 = null;
+ for (int i = 0; i < 2; i++)
+ {
+ msg_1 = cons_1.receive(5000);
+ assertNotNull(msg_1);
+ msg_2 = cons_2.receive(5000);
+ assertNotNull(msg_2);
+ msg_3 = cons_3.receive(5000);
+ assertNotNull(msg_3);
+ }
+
+ assertEquals(3 * 2, proxy.retrieveAttributeValue("deliveringCount"));
+
+ msg_1.acknowledge();
+ assertEquals(2 * 2, proxy.retrieveAttributeValue("deliveringCount"));
+ msg_2.acknowledge();
+ assertEquals(1 * 2, proxy.retrieveAttributeValue("deliveringCount"));
+ msg_3.acknowledge();
+ assertEquals(0, proxy.retrieveAttributeValue("deliveringCount"));
+
+ connection_1.close();
+ connection_2.close();
+ connection_3.close();
+ }
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2010-11-06
00:03:12 UTC (rev 9851)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2010-11-08
14:22:13 UTC (rev 9852)
@@ -348,6 +348,12 @@
return name;
}
+ public SimpleString getAddress()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
/* (non-Javadoc)
* @see org.hornetq.core.server.Queue#getID()
*/