JBoss hornetq SVN: r9711 - in branches/hornetq-416: src/main/org/hornetq/core/server and 4 other directories.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-09-22 05:07:31 -0400 (Wed, 22 Sep 2010)
New Revision: 9711
Added:
branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSConsumerInfo.java
Modified:
branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSServerControl.java
branches/hornetq-416/src/main/org/hornetq/core/server/Queue.java
branches/hornetq-416/src/main/org/hornetq/core/server/ServerConsumer.java
branches/hornetq-416/src/main/org/hornetq/core/server/ServerSession.java
branches/hornetq-416/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/hornetq-416/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
branches/hornetq-416/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java
branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java
branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java
branches/hornetq-416/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
Log:
https://jira.jboss.org/browse/HORNETQ-416
* add JMSConsumerInfo and JMSServerControl.listConsumersAsJSON()
* add ServerConsumer.getCreationTime() and isBrowseOnly() methods
Added: branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSConsumerInfo.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSConsumerInfo.java (rev 0)
+++ branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSConsumerInfo.java 2010-09-22 09:07:31 UTC (rev 9711)
@@ -0,0 +1,147 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.api.jms.management;
+
+import org.hornetq.utils.json.JSONArray;
+import org.hornetq.utils.json.JSONObject;
+
+/**
+ * Helper class to create Java Objects from the
+ * JSON serialization returned by {@link JMSServerControl#listConsumersAsJSON(String)} and related methods.
+ *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ */
+public class JMSConsumerInfo
+{
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final String consumerID;
+
+ private final String connectionID;
+
+ private final String destinationName;
+
+ private final String destinationType;
+
+ private final boolean browseOnly;
+
+ private final long creationTime;
+
+ private final boolean durable;
+
+ private final String filter;
+
+ // Static --------------------------------------------------------
+
+ /**
+ * Returns an array of SubscriptionInfo corresponding to the JSON serialization returned
+ * by {@link TopicControl#listAllSubscriptionsAsJSON()} and related methods.
+ */
+ public static JMSConsumerInfo[] from(final String jsonString) throws Exception
+ {
+ JSONArray array = new JSONArray(jsonString);
+ JMSConsumerInfo[] infos = new JMSConsumerInfo[array.length()];
+ for (int i = 0; i < array.length(); i++)
+ {
+ JSONObject sub = array.getJSONObject(i);
+ JMSConsumerInfo info = new JMSConsumerInfo(sub.getString("consumerID"),
+ sub.getString("connectionID"),
+ sub.getString("destinationName"),
+ sub.getString("destinationType"),
+ sub.getBoolean("browseOnly"),
+ sub.getLong("creationTime"),
+ sub.getBoolean("durable"),
+ sub.optString("filter", null));
+ infos[i] = info;
+ }
+
+ return infos;
+ }
+
+ // Constructors --------------------------------------------------
+
+ private JMSConsumerInfo(final String consumerID,
+ final String connectionID,
+ final String destinationName,
+ final String destinationType,
+ final boolean browseOnly,
+ final long creationTime,
+ final boolean durable,
+ final String filter)
+ {
+ this.consumerID = consumerID;
+ this.connectionID = connectionID;
+ this.destinationName = destinationName;
+ this.destinationType = destinationType;
+ this.browseOnly = browseOnly;
+ this.creationTime = creationTime;
+ this.durable = durable;
+ this.filter = filter;
+ }
+
+ // Public --------------------------------------------------------
+
+ public String getConsumerID()
+ {
+ return consumerID;
+ }
+
+ public String getConnectionID()
+ {
+ return connectionID;
+ }
+
+ public String getDestinationName()
+ {
+ return destinationName;
+ }
+
+ public String getDestinationType()
+ {
+ return destinationType;
+ }
+
+ public boolean isBrowseOnly()
+ {
+ return browseOnly;
+ }
+
+ public long getCreationTime()
+ {
+ return creationTime;
+ }
+
+ /**
+ * @return the durable
+ */
+ public boolean isDurable()
+ {
+ return durable;
+ }
+
+ public String getFilter()
+ {
+ return filter;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+}
Modified: branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSServerControl.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSServerControl.java 2010-09-21 22:27:14 UTC (rev 9710)
+++ branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSServerControl.java 2010-09-22 09:07:31 UTC (rev 9711)
@@ -228,6 +228,13 @@
@Operation(desc = "List all the connection IDs", impact = MBeanOperationInfo.INFO)
String[] listConnectionIDs() throws Exception;
+ /**
+ * Lists all the connections connected to this server.
+ * The returned String is a JSON string containing an array of JMSConnectionInfo objects.
+ *
+ * @see JMSConnectionInfo#from(String)
+ */
+ @Operation(desc = "List all JMS connections")
String listConnectionsAsJSON() throws Exception;
/**
@@ -236,5 +243,14 @@
@Operation(desc = "List the sessions for the given connectionID", impact = MBeanOperationInfo.INFO)
String[] listSessions(@Parameter(desc = "a connection ID", name = "connectionID") String connectionID) throws Exception;
+ /**
+ * Lists all the consumers which belongs to the JMS Connection specified by the connectionID.
+ * The returned String is a JSON string containing an array of JMSConsumerInfo objects.
+ *
+ * @see JMSConsumerInfo#from(String)
+ */
+ @Operation(desc = "List all JMS consumers associated to a JMS Connection")
+ String listConsumersAsJSON(@Parameter(desc = "a connection ID", name = "connectionID") String connectionID) throws Exception;
+
}
Modified: branches/hornetq-416/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/core/server/Queue.java 2010-09-21 22:27:14 UTC (rev 9710)
+++ branches/hornetq-416/src/main/org/hornetq/core/server/Queue.java 2010-09-22 09:07:31 UTC (rev 9711)
@@ -157,4 +157,6 @@
void close() throws Exception;
boolean isDirectDeliver();
+
+ SimpleString getAddress();
}
Modified: branches/hornetq-416/src/main/org/hornetq/core/server/ServerConsumer.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/core/server/ServerConsumer.java 2010-09-21 22:27:14 UTC (rev 9710)
+++ branches/hornetq-416/src/main/org/hornetq/core/server/ServerConsumer.java 2010-09-22 09:07:31 UTC (rev 9711)
@@ -47,6 +47,10 @@
void forceDelivery(long sequence);
void setTransferring(boolean transferring);
+
+ boolean isBrowseOnly();
+
+ long getCreationTime();
}
Modified: branches/hornetq-416/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/core/server/ServerSession.java 2010-09-21 22:27:14 UTC (rev 9710)
+++ branches/hornetq-416/src/main/org/hornetq/core/server/ServerSession.java 2010-09-22 09:07:31 UTC (rev 9711)
@@ -14,6 +14,7 @@
package org.hornetq.core.server;
import java.util.List;
+import java.util.Set;
import javax.transaction.xa.Xid;
@@ -110,4 +111,6 @@
void close(boolean failed) throws Exception;
void setTransferring(boolean transferring);
+
+ Set<ServerConsumer> getServerConsumers();
}
Modified: branches/hornetq-416/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-09-21 22:27:14 UTC (rev 9710)
+++ branches/hornetq-416/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-09-22 09:07:31 UTC (rev 9711)
@@ -238,6 +238,11 @@
{
return name;
}
+
+ public SimpleString getAddress()
+ {
+ return address;
+ }
public long getID()
{
Modified: branches/hornetq-416/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2010-09-21 22:27:14 UTC (rev 9710)
+++ branches/hornetq-416/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2010-09-22 09:07:31 UTC (rev 9711)
@@ -118,6 +118,8 @@
private boolean transferring = false;
+ private final long creationTime;
+
// Constructors ---------------------------------------------------------------------------------
public ServerConsumerImpl(final long id,
@@ -160,6 +162,8 @@
this.strictUpdateDeliveryCount = strictUpdateDeliveryCount;
+ this.creationTime = System.currentTimeMillis();
+
if (browseOnly)
{
browserDeliverer = new BrowserDeliverer(messageQueue.iterator());
@@ -177,7 +181,17 @@
{
return id;
}
+
+ public boolean isBrowseOnly()
+ {
+ return browseOnly;
+ }
+ public long getCreationTime()
+ {
+ return creationTime;
+ }
+
public HandleStatus handle(final MessageReference ref) throws Exception
{
if (availableCredits != null && availableCredits.get() <= 0)
Modified: branches/hornetq-416/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-09-21 22:27:14 UTC (rev 9710)
+++ branches/hornetq-416/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-09-22 09:07:31 UTC (rev 9711)
@@ -16,6 +16,7 @@
import static org.hornetq.api.core.management.NotificationType.CONSUMER_CREATED;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -232,6 +233,11 @@
{
return remotingConnection.getID();
}
+
+ public Set<ServerConsumer> getServerConsumers() {
+ Set<ServerConsumer> consumersClone = new HashSet<ServerConsumer>(consumers.values());
+ return Collections.unmodifiableSet(consumersClone);
+ }
public void removeConsumer(final long consumerID) throws Exception
{
Modified: branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java 2010-09-21 22:27:14 UTC (rev 9710)
+++ branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java 2010-09-22 09:07:31 UTC (rev 9711)
@@ -36,8 +36,13 @@
import org.hornetq.api.jms.management.JMSQueueControl;
import org.hornetq.api.jms.management.JMSServerControl;
import org.hornetq.api.jms.management.TopicControl;
+import org.hornetq.core.filter.Filter;
import org.hornetq.core.management.impl.MBeanInfoHelper;
+import org.hornetq.core.server.ServerConsumer;
import org.hornetq.core.server.ServerSession;
+import org.hornetq.jms.client.HornetQDestination;
+import org.hornetq.jms.client.HornetQQueue;
+import org.hornetq.jms.client.HornetQTopic;
import org.hornetq.jms.server.JMSServerManager;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.utils.json.JSONArray;
@@ -90,6 +95,38 @@
}
return trimmed;
}
+
+ private static String[] determineJMSDestination(String coreAddress)
+ {
+ String[] result = new String[2]; // destination name & type
+ if (coreAddress.startsWith(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX))
+ {
+ result[0] = coreAddress.substring(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX.length());
+ result[1] = "queue";
+ }
+ else if (coreAddress.startsWith(HornetQQueue.JMS_TEMP_QUEUE_ADDRESS_PREFIX))
+ {
+ result[0] = coreAddress.substring(HornetQQueue.JMS_TEMP_QUEUE_ADDRESS_PREFIX.length());
+ result[1] = "tempqueue";
+ }
+ else if (coreAddress.startsWith(HornetQQueue.JMS_TOPIC_ADDRESS_PREFIX))
+ {
+ result[0] = coreAddress.substring(HornetQQueue.JMS_TOPIC_ADDRESS_PREFIX.length());
+ result[1] = "topic";
+ }
+ else if (coreAddress.startsWith(HornetQQueue.JMS_TEMP_TOPIC_ADDRESS_PREFIX))
+ {
+ result[0] = coreAddress.substring(HornetQQueue.JMS_TEMP_TOPIC_ADDRESS_PREFIX.length());
+ result[1] = "temptopic";
+ }
+ else
+ {
+ System.out.println("JMSServerControlImpl.determineJMSDestination()" + coreAddress);
+ // not related to JMS
+ return null;
+ }
+ return result;
+ }
private static List<Pair<TransportConfiguration, TransportConfiguration>> convertToConnectorPairs(final Object[] liveConnectorsTransportClassNames,
final Object[] liveConnectorTransportParams,
@@ -598,7 +635,74 @@
blockOnIO();
}
}
-
+
+ public String listConsumersAsJSON(String connectionID) throws Exception
+ {
+ checkStarted();
+
+ clearIO();
+
+ try
+ {
+ JSONArray array = new JSONArray();
+
+ Set<RemotingConnection> connections = server.getHornetQServer().getRemotingService().getConnections();
+ for (RemotingConnection connection : connections)
+ {
+ if (connectionID.equals(connection.getID().toString()))
+ {
+ List<ServerSession> sessions = server.getHornetQServer().getSessions(connectionID);
+ for (ServerSession session : sessions)
+ {
+ Set<ServerConsumer> consumers = session.getServerConsumers();
+ for (ServerConsumer consumer : consumers)
+ {
+ JSONObject obj = new JSONObject();
+ obj.put("consumerID", consumer.getID());
+ obj.put("connectionID", connectionID);
+ obj.put("queueName", consumer.getQueue().getName().toString());
+ obj.put("browseOnly", consumer.isBrowseOnly());
+ obj.put("creationTime", consumer.getCreationTime());
+ // JMS consumer with message filter use the queue's filter
+ Filter queueFilter = consumer.getQueue().getFilter();
+ if (queueFilter != null)
+ {
+ obj.put("filter", queueFilter.getFilterString().toString());
+ }
+ String[] destinationInfo = determineJMSDestination(consumer.getQueue().getAddress().toString());
+ if (destinationInfo == null)
+ {
+ continue;
+ }
+ obj.put("destinationName", destinationInfo[0]);
+ obj.put("destinationType", destinationInfo[1]);
+ if (destinationInfo[1].equals("topic")) {
+ try
+ {
+ HornetQDestination.decomposeQueueNameForDurableSubscription(consumer.getQueue().getName().toString());
+ obj.put("durable", true);
+ } catch (IllegalArgumentException e)
+ {
+ obj.put("durable", false);
+ }
+ }
+ else
+ {
+ obj.put("durable", false);
+ }
+ array.put(obj);
+ }
+ }
+ }
+ }
+ return array.toString();
+ }
+ finally
+ {
+ blockOnIO();
+ }
+ }
+
public String[] listSessions(final String connectionID) throws Exception
{
checkStarted();
Modified: branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java
===================================================================
--- branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java 2010-09-21 22:27:14 UTC (rev 9710)
+++ branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java 2010-09-22 09:07:31 UTC (rev 9711)
@@ -21,11 +21,19 @@
import javax.jms.ConnectionFactory;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.jms.TemporaryTopic;
+import javax.jms.Topic;
import junit.framework.Assert;
import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.api.jms.management.JMSConnectionInfo;
+import org.hornetq.api.jms.management.JMSConsumerInfo;
import org.hornetq.api.jms.management.JMSServerControl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
@@ -36,6 +44,7 @@
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.HornetQServers;
+import org.hornetq.jms.client.HornetQQueue;
import org.hornetq.jms.server.impl.JMSServerManagerImpl;
import org.hornetq.tests.integration.management.ManagementControlHelper;
import org.hornetq.tests.integration.management.ManagementTestBase;
@@ -150,7 +159,195 @@
{
doListConnectionsAsJSON(InVMAcceptorFactory.class.getName(), InVMConnectorFactory.class.getName());
}
+
+ public void testListConsumersAsJSON() throws Exception
+ {
+ String queueName = RandomUtil.randomString();
+ try
+ {
+ startHornetQServer(NettyAcceptorFactory.class.getName());
+ serverManager.createQueue(false, queueName, null, true, queueName);
+ Queue queue = HornetQJMSClient.createQueue(queueName);
+
+ JMSServerControl control = createManagementControl();
+
+ long startTime = System.currentTimeMillis();
+
+ String jsonStr = control.listConnectionsAsJSON();
+ assertNotNull(jsonStr);
+ JMSConnectionInfo[] infos = JMSConnectionInfo.from(jsonStr);
+ assertEquals(0, infos.length);
+
+ ConnectionFactory cf1 = JMSUtil.createFactory(NettyConnectorFactory.class.getName(),
+ JMSServerControl2Test.CONNECTION_TTL,
+ JMSServerControl2Test.PING_PERIOD);
+ Connection connection = cf1.createConnection();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ TemporaryTopic temporaryTopic = session.createTemporaryTopic();
+
+ // create a regular message consumer
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ jsonStr = control.listConnectionsAsJSON();
+ assertNotNull(jsonStr);
+ infos = JMSConnectionInfo.from(jsonStr);
+ assertEquals(1, infos.length);
+ String connectionID = infos[0].getConnectionID();
+
+ String consJsonStr = control.listConsumersAsJSON(connectionID);
+ assertNotNull(consJsonStr);
+ JMSConsumerInfo[] consumerInfos = JMSConsumerInfo.from(consJsonStr);
+ assertEquals(1, consumerInfos.length);
+ JMSConsumerInfo consumerInfo = consumerInfos[0];
+ assertNotNull(consumerInfo.getConsumerID());
+ assertEquals(connectionID, consumerInfo.getConnectionID());
+ assertEquals(queue.getQueueName(), consumerInfo.getDestinationName());
+ assertEquals("queue", consumerInfo.getDestinationType());
+ assertNull(consumerInfo.getFilter());
+ assertEquals(false, consumerInfo.isBrowseOnly());
+ assertEquals(false, consumerInfo.isDurable());
+ assertTrue(startTime <= consumerInfo.getCreationTime() && consumerInfo.getCreationTime() <= System.currentTimeMillis());
+
+ consumer.close();
+
+ consJsonStr = control.listConsumersAsJSON(connectionID);
+ assertNotNull(consJsonStr);
+ consumerInfos = JMSConsumerInfo.from(consJsonStr);
+ assertEquals(0, consumerInfos.length);
+
+ // create a queue browser
+ QueueBrowser browser = session.createBrowser(queue);
+ // the server resources are created when the browser starts enumerating
+ browser.getEnumeration();
+
+ consJsonStr = control.listConsumersAsJSON(connectionID);
+ assertNotNull(consJsonStr);
+ System.out.println(consJsonStr);
+ consumerInfos = JMSConsumerInfo.from(consJsonStr);
+ assertEquals(1, consumerInfos.length);
+ consumerInfo = consumerInfos[0];
+ assertNotNull(consumerInfo.getConsumerID());
+ assertEquals(connectionID, consumerInfo.getConnectionID());
+ assertEquals(queue.getQueueName(), consumerInfo.getDestinationName());
+ assertEquals("queue", consumerInfo.getDestinationType());
+ assertNull(consumerInfo.getFilter());
+ assertEquals(true, consumerInfo.isBrowseOnly());
+ assertEquals(false, consumerInfo.isDurable());
+ assertTrue(startTime <= consumerInfo.getCreationTime() && consumerInfo.getCreationTime() <= System.currentTimeMillis());
+
+ browser.close();
+
+ // create a regular consumer w/ filter on a temp topic
+ String filter = "color = 'red'";
+ consumer = session.createConsumer(temporaryTopic, filter);
+
+ consJsonStr = control.listConsumersAsJSON(connectionID);
+ assertNotNull(consJsonStr);
+ System.out.println(consJsonStr);
+ consumerInfos = JMSConsumerInfo.from(consJsonStr);
+ assertEquals(1, consumerInfos.length);
+ consumerInfo = consumerInfos[0];
+ assertNotNull(consumerInfo.getConsumerID());
+ assertEquals(connectionID, consumerInfo.getConnectionID());
+ assertEquals(temporaryTopic.getTopicName(), consumerInfo.getDestinationName());
+ assertEquals("temptopic", consumerInfo.getDestinationType());
+ assertEquals(filter, consumerInfo.getFilter());
+ assertEquals(false, consumerInfo.isBrowseOnly());
+ assertEquals(false, consumerInfo.isDurable());
+ assertTrue(startTime <= consumerInfo.getCreationTime() && consumerInfo.getCreationTime() <= System.currentTimeMillis());
+
+ consumer.close();
+
+ connection.close();
+ }
+ finally
+ {
+ if (serverManager != null)
+ {
+ serverManager.destroyQueue(queueName);
+ serverManager.stop();
+ }
+
+ if (server != null)
+ {
+ server.stop();
+ }
+ }
+ }
+
+ /**
+ * test for durable subscriber
+ */
+ public void testListConsumersAsJSON2() throws Exception
+ {
+ String topicName = RandomUtil.randomString();
+ String clientID = RandomUtil.randomString();
+ String subName = RandomUtil.randomString();
+
+ try
+ {
+ startHornetQServer(NettyAcceptorFactory.class.getName());
+ serverManager.createTopic(false, topicName, topicName);
+ Topic topic = HornetQJMSClient.createTopic(topicName);
+
+ JMSServerControl control = createManagementControl();
+
+ long startTime = System.currentTimeMillis();
+
+ String jsonStr = control.listConnectionsAsJSON();
+ assertNotNull(jsonStr);
+ JMSConnectionInfo[] infos = JMSConnectionInfo.from(jsonStr);
+ assertEquals(0, infos.length);
+
+ ConnectionFactory cf1 = JMSUtil.createFactory(NettyConnectorFactory.class.getName(),
+ JMSServerControl2Test.CONNECTION_TTL,
+ JMSServerControl2Test.PING_PERIOD);
+ Connection connection = cf1.createConnection();
+ connection.setClientID(clientID);
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ // create a durable subscriber
+ MessageConsumer consumer = session.createDurableSubscriber(topic, subName);
+
+ jsonStr = control.listConnectionsAsJSON();
+ assertNotNull(jsonStr);
+ infos = JMSConnectionInfo.from(jsonStr);
+ assertEquals(1, infos.length);
+ String connectionID = infos[0].getConnectionID();
+
+ String consJsonStr = control.listConsumersAsJSON(connectionID);
+ assertNotNull(consJsonStr);
+ JMSConsumerInfo[] consumerInfos = JMSConsumerInfo.from(consJsonStr);
+ assertEquals(1, consumerInfos.length);
+ JMSConsumerInfo consumerInfo = consumerInfos[0];
+ assertNotNull(consumerInfo.getConsumerID());
+ assertEquals(connectionID, consumerInfo.getConnectionID());
+ assertEquals(topic.getTopicName(), consumerInfo.getDestinationName());
+ assertEquals("topic", consumerInfo.getDestinationType());
+ assertNull(consumerInfo.getFilter());
+ assertEquals(false, consumerInfo.isBrowseOnly());
+ assertEquals(true, consumerInfo.isDurable());
+ assertTrue(startTime <= consumerInfo.getCreationTime() && consumerInfo.getCreationTime() <= System.currentTimeMillis());
+
+ consumer.close();
+
+ connection.close();
+ }
+ finally
+ {
+ if (serverManager != null)
+ {
+ serverManager.destroyTopic(topicName);
+ serverManager.stop();
+ }
+
+ if (server != null)
+ {
+ server.stop();
+ }
+ }
+ }
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -246,7 +443,7 @@
{
assertNotNull(info.getConnectionID());
assertNotNull(info.getClientAddress());
- assertTrue(startTime < info.getCreationTime() && info.getCreationTime() < System.currentTimeMillis());
+ assertTrue(startTime <= info.getCreationTime() && info.getCreationTime() <= System.currentTimeMillis());
}
ConnectionFactory cf2 = JMSUtil.createFactory(connectorFactory,
@@ -262,7 +459,7 @@
{
assertNotNull(info.getConnectionID());
assertNotNull(info.getClientAddress());
- assertTrue(startTime < info.getCreationTime() && info.getCreationTime() < System.currentTimeMillis());
+ assertTrue(startTime <= info.getCreationTime() && info.getCreationTime() <= System.currentTimeMillis());
}
connection.close();
Modified: branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java
===================================================================
--- branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java 2010-09-21 22:27:14 UTC (rev 9710)
+++ branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java 2010-09-22 09:07:31 UTC (rev 9711)
@@ -238,6 +238,11 @@
{
return (String)proxy.invokeOperation("listConnectionsAsJSON");
}
+
+ public String listConsumersAsJSON(String connectionID) throws Exception
+ {
+ return (String)proxy.invokeOperation("listConsumersAsJSON", connectionID);
+ }
public String[] listRemoteAddresses() throws Exception
{
Modified: branches/hornetq-416/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- branches/hornetq-416/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2010-09-21 22:27:14 UTC (rev 9710)
+++ branches/hornetq-416/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2010-09-22 09:07:31 UTC (rev 9711)
@@ -348,6 +348,12 @@
return name;
}
+ public SimpleString getAddress()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
/* (non-Javadoc)
* @see org.hornetq.core.server.Queue#getID()
*/
14 years, 5 months
JBoss hornetq SVN: r9710 - trunk/src/main/org/hornetq/core/postoffice/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-09-21 18:27:14 -0400 (Tue, 21 Sep 2010)
New Revision: 9710
Modified:
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
Log:
HORNETQ-521 - Fixing failing test on ClusteredGroupingTest
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-09-21 22:25:29 UTC (rev 9709)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-09-21 22:27:14 UTC (rev 9710)
@@ -482,7 +482,14 @@
if (binding.getType() == BindingType.LOCAL_QUEUE)
{
managementService.unregisterQueue(uniqueName, binding.getAddress());
-
+ }
+ else if (binding.getType() == BindingType.DIVERT)
+ {
+ managementService.unregisterDivert(uniqueName);
+ }
+
+ if (binding.getType() != BindingType.DIVERT)
+ {
TypedProperties props = new TypedProperties();
props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress());
@@ -495,10 +502,6 @@
managementService.sendNotification(new Notification(null, NotificationType.BINDING_REMOVED, props));
}
- else if (binding.getType() == BindingType.DIVERT)
- {
- managementService.unregisterDivert(uniqueName);
- }
binding.close();
14 years, 5 months
JBoss hornetq SVN: r9709 - in trunk: src/main/org/hornetq/core/paging/impl and 4 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-09-21 18:25:29 -0400 (Tue, 21 Sep 2010)
New Revision: 9709
Modified:
trunk/src/main/org/hornetq/core/paging/PagingStore.java
trunk/src/main/org/hornetq/core/paging/impl/PageImpl.java
trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java
trunk/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java
trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/hornetq/core/server/HornetQServer.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
HORNETQ-523 - Ordering issue with TX and paging
Modified: trunk/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/PagingStore.java 2010-09-21 14:18:07 UTC (rev 9708)
+++ trunk/src/main/org/hornetq/core/paging/PagingStore.java 2010-09-21 22:25:29 UTC (rev 9709)
@@ -13,6 +13,8 @@
package org.hornetq.core.paging;
+import java.util.List;
+
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.server.HornetQComponent;
import org.hornetq.core.server.ServerMessage;
@@ -49,7 +51,7 @@
void sync() throws Exception;
- boolean page(ServerMessage message, long transactionId) throws Exception;
+ boolean page(List<ServerMessage> messages, long transactionId) throws Exception;
boolean page(ServerMessage message) throws Exception;
Modified: trunk/src/main/org/hornetq/core/paging/impl/PageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PageImpl.java 2010-09-21 14:18:07 UTC (rev 9708)
+++ trunk/src/main/org/hornetq/core/paging/impl/PageImpl.java 2010-09-21 22:25:29 UTC (rev 9709)
@@ -236,6 +236,11 @@
{
return size.intValue();
}
+
+ public String toString()
+ {
+ return "PageImpl::pageID=" + this.pageId + ", file=" + this.file;
+ }
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java 2010-09-21 14:18:07 UTC (rev 9708)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingManagerImpl.java 2010-09-21 22:25:29 UTC (rev 9709)
@@ -235,11 +235,16 @@
// Private -------------------------------------------------------
- private PagingStore newStore(final SimpleString address) throws Exception
+ protected PagingStore newStore(final SimpleString address) throws Exception
{
return pagingStoreFactory.newStore(address,
addressSettingsRepository.getMatch(address.toString()));
}
+
+ protected PagingStoreFactory getStoreFactory()
+ {
+ return pagingStoreFactory;
+ }
// Inner classes -------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java 2010-09-21 14:18:07 UTC (rev 9708)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreFactoryNIO.java 2010-09-21 22:25:29 UTC (rev 9709)
@@ -220,6 +220,26 @@
{
return new NIOSequentialFileFactory(directory + File.separatorChar + directoryName, false);
}
+
+ protected PagingManager getPagingManager()
+ {
+ return pagingManager;
+ }
+
+ protected StorageManager getStorageManager()
+ {
+ return storageManager;
+ }
+
+ protected PostOffice getPostOffice()
+ {
+ return postOffice;
+ }
+
+ protected ExecutorFactory getExecutorFactory()
+ {
+ return executorFactory;
+ }
// Private -------------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-09-21 14:18:07 UTC (rev 9708)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-09-21 22:25:29 UTC (rev 9709)
@@ -14,6 +14,7 @@
package org.hornetq.core.paging.impl;
import java.text.DecimalFormat;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -110,7 +111,7 @@
private volatile Page currentPage;
private final ReentrantLock writeLock = new ReentrantLock();
-
+
/** duplicate cache used at this address */
private final DuplicateIDCache duplicateCache;
@@ -186,7 +187,7 @@
this.storeFactory = storeFactory;
this.syncNonTransactional = syncNonTransactional;
-
+
// Post office could be null on the backup node
if (postOffice == null)
{
@@ -196,7 +197,7 @@
{
this.duplicateCache = postOffice.getDuplicateIDCache(storeName);
}
-
+
}
// Public --------------------------------------------------------
@@ -263,7 +264,7 @@
return storeName;
}
- public boolean page(final ServerMessage message, final long transactionID) throws Exception
+ public boolean page(final List<ServerMessage> message, final long transactionID) throws Exception
{
// The sync on transactions is done on commit only
return page(message, transactionID, false);
@@ -273,7 +274,7 @@
{
// If non Durable, there is no need to sync as there is no requirement for persistence for those messages in case
// of crash
- return page(message, -1, syncNonTransactional && message.isDurable());
+ return page(Arrays.asList(message), -1, syncNonTransactional && message.isDurable());
}
public void sync() throws Exception
@@ -541,7 +542,6 @@
writeLock.lock();
currentPageLock.writeLock().lock(); // Make sure no checks are done on currentPage while we are depaging
-
try
{
if (!running)
@@ -597,6 +597,7 @@
{
returnPage = createPage(firstPageId++);
}
+
return returnPage;
}
}
@@ -619,10 +620,14 @@
* @return
* @throws Exception
*/
- private boolean readPage() throws Exception
+ protected boolean readPage() throws Exception
{
Page page = depage();
+ // It's important that only depage should happen while locked
+ // or we would be holding a lock for a long time
+ // The reading (IO part) should happen outside of any locks
+
if (page == null)
{
return false;
@@ -630,8 +635,8 @@
page.open();
- List<PagedMessage> messages = null;
-
+ List<PagedMessage> messages = null;
+
try
{
messages = page.read();
@@ -688,25 +693,25 @@
class OurRunnable implements Runnable
{
boolean ran;
-
+
final Runnable runnable;
-
+
OurRunnable(final Runnable runnable)
{
this.runnable = runnable;
}
-
+
public synchronized void run()
{
if (!ran)
{
runnable.run();
-
+
ran = true;
}
}
}
-
+
public void executeRunnableWhenMemoryAvailable(final Runnable runnable)
{
if (addressFullMessagePolicy == AddressFullMessagePolicy.BLOCK && maxSize != -1)
@@ -714,23 +719,23 @@
if (sizeInBytes.get() > maxSize)
{
OurRunnable ourRunnable = new OurRunnable(runnable);
-
+
onMemoryFreedRunnables.add(ourRunnable);
-
- //We check again to avoid a race condition where the size can come down just after the element
- //has been added, but the check to execute was done before the element was added
- //NOTE! We do not fix this race by locking the whole thing, doing this check provides
- //MUCH better performance in a highly concurrent environment
+
+ // We check again to avoid a race condition where the size can come down just after the element
+ // has been added, but the check to execute was done before the element was added
+ // NOTE! We do not fix this race by locking the whole thing, doing this check provides
+ // MUCH better performance in a highly concurrent environment
if (sizeInBytes.get() <= maxSize)
{
- //run it now
+ // run it now
ourRunnable.run();
}
return;
}
}
-
+
runnable.run();
}
@@ -797,9 +802,7 @@
}
- private boolean page(final ServerMessage message,
- final long transactionID,
- final boolean sync) throws Exception
+ protected boolean page(final List<ServerMessage> messages, final long transactionID, final boolean sync) throws Exception
{
if (!running)
{
@@ -857,60 +860,63 @@
return false;
}
- PagedMessage pagedMessage;
-
- if (!message.isDurable())
+ for (ServerMessage message : messages)
{
- // The address should never be transient when paging (even for non-persistent messages when paging)
- // This will force everything to be persisted
- message.bodyChanged();
- }
+ PagedMessage pagedMessage;
- if (transactionID != -1)
- {
- pagedMessage = new PagedMessageImpl(message, transactionID);
- }
- else
- {
- pagedMessage = new PagedMessageImpl(message);
- }
+ if (!message.isDurable())
+ {
+ // The address should never be transient when paging (even for non-persistent messages when paging)
+ // This will force everything to be persisted
+ message.bodyChanged();
+ }
- int bytesToWrite = pagedMessage.getEncodeSize() + PageImpl.SIZE_RECORD;
-
- if (currentPageSize.addAndGet(bytesToWrite) > pageSize && currentPage.getNumberOfMessages() > 0)
- {
- // Make sure nothing is currently validating or using currentPage
- currentPageLock.writeLock().lock();
- try
+ if (transactionID != -1)
{
- openNewPage();
-
- // openNewPage will set currentPageSize to zero, we need to set it again
- currentPageSize.addAndGet(bytesToWrite);
+ pagedMessage = new PagedMessageImpl(message, transactionID);
}
- finally
+ else
{
- currentPageLock.writeLock().unlock();
+ pagedMessage = new PagedMessageImpl(message);
}
- }
- currentPageLock.readLock().lock();
+ int bytesToWrite = pagedMessage.getEncodeSize() + PageImpl.SIZE_RECORD;
- try
- {
- currentPage.write(pagedMessage);
+ if (currentPageSize.addAndGet(bytesToWrite) > pageSize && currentPage.getNumberOfMessages() > 0)
+ {
+ // Make sure nothing is currently validating or using currentPage
+ currentPageLock.writeLock().lock();
+ try
+ {
+ openNewPage();
- if (sync)
+ // openNewPage will set currentPageSize to zero, we need to set it again
+ currentPageSize.addAndGet(bytesToWrite);
+ }
+ finally
+ {
+ currentPageLock.writeLock().unlock();
+ }
+ }
+
+ currentPageLock.readLock().lock();
+
+ try
{
- currentPage.sync();
+ currentPage.write(pagedMessage);
+
+ if (sync)
+ {
+ currentPage.sync();
+ }
}
+ finally
+ {
+ currentPageLock.readLock().unlock();
+ }
+ }
- return true;
- }
- finally
- {
- currentPageLock.readLock().unlock();
- }
+ return true;
}
finally
{
@@ -940,9 +946,9 @@
// Depage has to be done atomically, in case of failure it should be
// back to where it was
-
+
byte[] duplicateIdForPage = generateDuplicateID(pageId);
-
+
Transaction depageTransaction = new TransactionImpl(storageManager);
// DuplicateCache could be null during replication
@@ -950,7 +956,8 @@
{
if (duplicateCache.contains(duplicateIdForPage))
{
- log.warn("Page " + pageId + " had been processed already but the file wasn't removed as a crash happened. Ignoring this page");
+ log.warn("Page " + pageId +
+ " had been processed already but the file wasn't removed as a crash happened. Ignoring this page");
return true;
}
@@ -1058,7 +1065,7 @@
{
// This will set the journal transaction to commit;
depageTransaction.setContainsPersistent();
-
+
entry.getKey().storeUpdate(storageManager, this.pagingManager, depageTransaction, entry.getValue().intValue());
}
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-09-21 14:18:07 UTC (rev 9708)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-09-21 22:25:29 UTC (rev 9709)
@@ -15,16 +15,15 @@
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Message;
+import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.management.ManagementHelper;
import org.hornetq.api.core.management.NotificationType;
@@ -476,10 +475,10 @@
if (addressManager.getBindingsForRoutingAddress(binding.getAddress()) == null)
{
pagingManager.deletePageStore(binding.getAddress());
-
+
managementService.unregisterAddress(binding.getAddress());
}
-
+
if (binding.getType() == BindingType.LOCAL_QUEUE)
{
managementService.unregisterQueue(uniqueName, binding.getAddress());
@@ -502,7 +501,7 @@
}
binding.close();
-
+
return binding;
}
@@ -537,7 +536,7 @@
{
route(message, new RoutingContextImpl(tx), direct);
}
-
+
public void route(final ServerMessage message, final RoutingContext context, final boolean direct) throws Exception
{
// Sanity check
@@ -547,7 +546,7 @@
}
SimpleString address = message.getAddress();
-
+
setPagingStore(message);
Object duplicateID = message.getObjectProperty(Message.HDR_DUPLICATE_DETECTION_ID);
@@ -614,17 +613,18 @@
else
{
Transaction tx = context.getTransaction();
-
+
boolean depage = tx.getProperty(TransactionPropertyIndexes.IS_DEPAGE) != null;
-
- // if the TX paged at least one message on a give address, all the other addresses should also go towards paging cache now
+
+ // if the TX paged at least one message on a give address, all the other addresses should also go towards
+ // paging cache now
boolean alreadyPaging = false;
-
+
if (tx.isPaging())
{
- alreadyPaging = getPageOperation(tx).isPaging(message.getAddress());
+ alreadyPaging = getPageOperation(tx).isPaging(message.getAddress());
}
-
+
if (!depage && message.storeIsPaging() || alreadyPaging)
{
tx.setPaging(true);
@@ -633,7 +633,7 @@
{
tx.commit();
}
-
+
return;
}
}
@@ -849,7 +849,7 @@
private void setPagingStore(final ServerMessage message) throws Exception
{
PagingStore store = pagingManager.getPageStore(message.getAddress());
-
+
message.setPagingStore(store);
}
@@ -1113,21 +1113,26 @@
private class PageMessageOperation implements TransactionOperation
{
- private final List<ServerMessage> messagesToPage = new ArrayList<ServerMessage>();
-
- private final HashSet<SimpleString> addressesPaging = new HashSet<SimpleString>();
-
+ private final HashMap<SimpleString, Pair<PagingStore, List<ServerMessage>>> pagingData = new HashMap<SimpleString, Pair<PagingStore, List<ServerMessage>>>();
+
private Transaction subTX = null;
-
+
void addMessageToPage(final ServerMessage message)
{
- messagesToPage.add(message);
- addressesPaging.add(message.getAddress());
+ Pair<PagingStore, List<ServerMessage>> pagePair = pagingData.get(message.getAddress());
+ if (pagePair == null)
+ {
+ pagePair = new Pair<PagingStore, List<ServerMessage>>(message.getPagingStore(),
+ new ArrayList<ServerMessage>());
+ pagingData.put(message.getAddress(), pagePair);
+ }
+
+ pagePair.b.add(message);
}
-
+
boolean isPaging(final SimpleString address)
{
- return addressesPaging.contains(address);
+ return pagingData.get(address) != null;
}
public void afterCommit(final Transaction tx)
@@ -1142,7 +1147,7 @@
{
pageTransaction.commit();
}
-
+
if (subTX != null)
{
subTX.afterCommit();
@@ -1178,18 +1183,18 @@
{
pageMessages(tx);
}
-
+
if (subTX != null)
{
subTX.beforeCommit();
}
-
+
}
public void beforePrepare(final Transaction tx) throws Exception
{
pageMessages(tx);
-
+
if (subTX != null)
{
subTX.beforePrepare();
@@ -1206,7 +1211,7 @@
private void pageMessages(final Transaction tx) throws Exception
{
- if (!messagesToPage.isEmpty())
+ if (!pagingData.isEmpty())
{
PageTransactionInfo pageTransaction = (PageTransactionInfo)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
@@ -1223,21 +1228,33 @@
boolean pagingPersistent = false;
- Set<PagingStore> pagingStoresToSync = new HashSet<PagingStore>();
+ ArrayList<ServerMessage> nonPagedMessages = null;
- for (ServerMessage message : messagesToPage)
+ for (Pair<PagingStore, List<ServerMessage>> pair : pagingData.values())
{
- if (message.page(tx.getID()))
+
+ if (!pair.a.page(pair.b, tx.getID()))
{
- if (message.isDurable())
+ if (nonPagedMessages == null)
{
- // We only create pageTransactions if using persistent messages
+ nonPagedMessages = new ArrayList<ServerMessage>();
+ }
+ nonPagedMessages.addAll(pair.b);
+ }
+
+ for (ServerMessage msg : pair.b)
+ {
+ if (msg.isDurable())
+ {
pageTransaction.increment();
pagingPersistent = true;
- pagingStoresToSync.add(message.getPagingStore());
}
}
- else
+ }
+
+ if (nonPagedMessages != null)
+ {
+ for (ServerMessage message : nonPagedMessages)
{
// This could happen when the PageStore left the pageState
// we create a copy of the transaction so that messages are routed with the same tx ID.
@@ -1246,9 +1263,9 @@
{
subTX = tx.copy();
}
-
+
route(message, subTX, false);
-
+
if (subTX.isContainsPersistent())
{
// The route wouldn't be able to update the persistent flag on the main TX
@@ -1261,16 +1278,12 @@
if (pagingPersistent)
{
tx.setContainsPersistent();
-
- if (!pagingStoresToSync.isEmpty())
+ for (Pair<PagingStore, List<ServerMessage>> pair : pagingData.values())
{
- for (PagingStore store : pagingStoresToSync)
- {
- store.sync();
- }
+ pair.a.sync();
+ }
- pageTransaction.store(storageManager, pagingManager, tx);
- }
+ pageTransaction.store(storageManager, pagingManager, tx);
}
}
}
Modified: trunk/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/HornetQServer.java 2010-09-21 14:18:07 UTC (rev 9708)
+++ trunk/src/main/org/hornetq/core/server/HornetQServer.java 2010-09-21 22:25:29 UTC (rev 9709)
@@ -23,6 +23,7 @@
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.DivertConfiguration;
import org.hornetq.core.management.impl.HornetQServerControlImpl;
+import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.protocol.core.Channel;
@@ -58,6 +59,8 @@
RemotingService getRemotingService();
StorageManager getStorageManager();
+
+ PagingManager getPagingManager();
ManagementService getManagementService();
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-09-21 14:18:07 UTC (rev 9708)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2010-09-21 22:25:29 UTC (rev 9709)
@@ -502,6 +502,11 @@
{
return mbeanServer;
}
+
+ public PagingManager getPagingManager()
+ {
+ return pagingManager;
+ }
public RemotingService getRemotingService()
{
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2010-09-21 14:18:07 UTC (rev 9708)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2010-09-21 22:25:29 UTC (rev 9709)
@@ -14,6 +14,7 @@
package org.hornetq.core.server.impl;
import java.io.InputStream;
+import java.util.Arrays;
import org.hornetq.api.core.Message;
import org.hornetq.api.core.SimpleString;
@@ -269,7 +270,7 @@
{
if (pagingStore != null)
{
- return pagingStore.page(this, transactionID);
+ return pagingStore.page(Arrays.asList((ServerMessage)this), transactionID);
}
else
{
Modified: trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-09-21 14:18:07 UTC (rev 9708)
+++ trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-09-21 22:25:29 UTC (rev 9709)
@@ -13,10 +13,17 @@
package org.hornetq.tests.integration.client;
+import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
@@ -33,14 +40,30 @@
import org.hornetq.api.core.client.MessageHandler;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.DivertConfiguration;
+import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.Page;
+import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.paging.PagingStoreFactory;
+import org.hornetq.core.paging.impl.PageImpl;
+import org.hornetq.core.paging.impl.PagingManagerImpl;
+import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
+import org.hornetq.core.paging.impl.PagingStoreImpl;
import org.hornetq.core.paging.impl.TestSupportPageStore;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.HornetQServerImpl;
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.spi.core.security.HornetQSecurityManager;
+import org.hornetq.spi.core.security.HornetQSecurityManagerImpl;
import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.ExecutorFactory;
/**
* A PagingTest
@@ -129,17 +152,17 @@
server.start();
- final int numberOfIntegers = 256;
+ final int messageSize = 1024;
final int numberOfMessages = 30000;
- final byte[] body = new byte[numberOfIntegers * 4];
+ final byte[] body = new byte[messageSize];
ByteBuffer bb = ByteBuffer.wrap(body);
- for (int j = 1; j <= numberOfIntegers; j++)
+ for (int j = 1; j <= messageSize; j++)
{
- bb.putInt(j);
+ bb.put(getSamplebyte(j));
}
try
@@ -244,7 +267,7 @@
}
consumer.close();
-
+
session.close();
}
catch (Throwable e)
@@ -266,11 +289,20 @@
{
threads[i].join();
}
-
+
assertEquals(0, errors.get());
+ for (int i = 0 ; i < 20 && server.getPostOffice().getPagingManager().getTransactions().size() != 0; i++)
+ {
+ if (server.getPostOffice().getPagingManager().getTransactions().size() != 0)
+ {
+ // The delete may be asynchronous, giving some time case it eventually happen asynchronously
+ Thread.sleep(500);
+ }
+ }
+
assertEquals(0, server.getPostOffice().getPagingManager().getTransactions().size());
-
+
}
finally
{
@@ -740,11 +772,11 @@
message.putIntProperty(new SimpleString("id"), i);
producerTransacted.send(message);
-
+
if (i % 2 == 0)
{
System.out.println("Sending 20 msgs to make it page");
- for (int j = 0 ; j < 20; j++)
+ for (int j = 0; j < 20; j++)
{
ClientMessage msgSend = sessionNonTX.createMessage(true);
msgSend.getBodyBuffer().writeBytes(new byte[10 * 1024]);
@@ -756,7 +788,7 @@
{
System.out.println("Consuming 20 msgs to make it page");
ClientConsumer consumer = sessionNonTX.createConsumer(PagingTest.ADDRESS);
- for (int j = 0 ; j < 20; j++)
+ for (int j = 0; j < 20; j++)
{
ClientMessage msgReceived = consumer.receive(10000);
assertNotNull(msgReceived);
@@ -765,7 +797,7 @@
consumer.close();
}
}
-
+
ClientConsumer consumerNonTX = sessionNonTX.createConsumer(PagingTest.ADDRESS);
while (true)
{
@@ -777,7 +809,6 @@
msgReceived.acknowledge();
}
consumerNonTX.close();
-
ClientConsumer consumer = sessionNonTX.createConsumer(PagingTest.ADDRESS);
@@ -798,7 +829,7 @@
// System.out.println(messageID);
Assert.assertNotNull(messageID);
Assert.assertEquals("message received out of order", i, messageID.intValue());
-
+
System.out.println("MessageID = " + messageID);
message.acknowledge();
@@ -823,7 +854,6 @@
}
-
public void testDepageDuringTransaction4() throws Exception
{
clearData();
@@ -835,93 +865,88 @@
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
new HashMap<String, AddressSettings>());
-
+
server.getConfiguration().setJournalSyncNonTransactional(false);
server.getConfiguration().setJournalSyncTransactional(false);
server.start();
final AtomicInteger errors = new AtomicInteger(0);
-
+
final int messageSize = 1024; // 1k
final int numberOfMessages = 10000;
try
{
final ClientSessionFactory sf = createInVMFactory();
-
sf.setBlockOnNonDurableSend(true);
sf.setBlockOnDurableSend(true);
sf.setBlockOnAcknowledge(false);
final byte[] body = new byte[messageSize];
-
-
+
Thread producerThread = new Thread()
{
- public void run()
- {
- ClientSession sessionProducer = null;
- try
- {
- sessionProducer = sf.createSession(false, false);
- ClientProducer producer = sessionProducer.createProducer(ADDRESS);
-
- for (int i = 0 ; i < numberOfMessages; i++)
- {
- ClientMessage msg = sessionProducer.createMessage(true);
- msg.getBodyBuffer().writeBytes(body);
- msg.putIntProperty("count", i);
- producer.send(msg);
-
- if (i % 50 == 0 && i != 0)
- {
- sessionProducer.commit();
- //Thread.sleep(500);
- }
- }
-
- sessionProducer.commit();
-
- System.out.println("Producer gone");
-
-
-
- }
- catch (Throwable e)
- {
- e.printStackTrace(); // >> junit report
- errors.incrementAndGet();
- }
- finally
- {
- try
- {
- if (sessionProducer != null)
- {
- sessionProducer.close();
- }
- }
- catch (Throwable e)
- {
- e.printStackTrace();
- errors.incrementAndGet();
- }
- }
- }
+ public void run()
+ {
+ ClientSession sessionProducer = null;
+ try
+ {
+ sessionProducer = sf.createSession(false, false);
+ ClientProducer producer = sessionProducer.createProducer(ADDRESS);
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = sessionProducer.createMessage(true);
+ msg.getBodyBuffer().writeBytes(body);
+ msg.putIntProperty("count", i);
+ producer.send(msg);
+
+ if (i % 50 == 0 && i != 0)
+ {
+ sessionProducer.commit();
+ // Thread.sleep(500);
+ }
+ }
+
+ sessionProducer.commit();
+
+ System.out.println("Producer gone");
+
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace(); // >> junit report
+ errors.incrementAndGet();
+ }
+ finally
+ {
+ try
+ {
+ if (sessionProducer != null)
+ {
+ sessionProducer.close();
+ }
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+ }
+ }
};
-
+
ClientSession session = sf.createSession(true, true, 0);
session.start();
session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
-
+
producerThread.start();
-
+
ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
-
-
- for (int i = 0 ; i < numberOfMessages; i++)
+
+ for (int i = 0; i < numberOfMessages; i++)
{
ClientMessage msg = consumer.receive(500000);
assertNotNull(msg);
@@ -929,15 +954,15 @@
msg.acknowledge();
if (i > 0 && i % 10 == 0)
{
- //session.commit();
+ // session.commit();
}
}
- //session.commit();
-
+ // session.commit();
+
session.close();
-
+
producerThread.join();
-
+
assertEquals(0, errors.get());
}
finally
@@ -953,6 +978,361 @@
}
+ // This test will force a depage thread as soon as the first message hits the page
+ public void testDepageOnTX5() throws Exception
+ {
+ clearData();
+
+ final Configuration config = createDefaultConfig();
+ HornetQSecurityManager securityManager = new HornetQSecurityManagerImpl();
+
+ final Executor executor = Executors.newSingleThreadExecutor();
+
+ final AtomicInteger countDepage = new AtomicInteger(0);
+ class HackPagingStore extends PagingStoreImpl
+ {
+ HackPagingStore(final SimpleString address,
+ final PagingManager pagingManager,
+ final StorageManager storageManager,
+ final PostOffice postOffice,
+ final SequentialFileFactory fileFactory,
+ final PagingStoreFactory storeFactory,
+ final SimpleString storeName,
+ final AddressSettings addressSettings,
+ final Executor executor,
+ final boolean syncNonTransactional)
+ {
+ super(address,
+ pagingManager,
+ storageManager,
+ postOffice,
+ fileFactory,
+ storeFactory,
+ storeName,
+ addressSettings,
+ executor,
+ syncNonTransactional);
+ }
+
+ protected boolean page(final List<ServerMessage> messages, final long transactionID, final boolean sync) throws Exception
+ {
+ boolean paged = super.page(messages, transactionID, sync);
+
+ if (paged)
+ {
+
+ if (countDepage.incrementAndGet() == 1)
+ {
+ countDepage.set(0);
+
+ executor.execute(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ while (isStarted() && readPage());
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ });
+ }
+ }
+
+ return paged;
+ }
+
+ public boolean startDepaging()
+ {
+ // do nothing, we are hacking depage right in between paging
+ return false;
+ }
+
+ };
+
+ class HackStoreFactory extends PagingStoreFactoryNIO
+ {
+ HackStoreFactory(final String directory,
+ final ExecutorFactory executorFactory,
+ final boolean syncNonTransactional)
+ {
+ super(directory, executorFactory, syncNonTransactional);
+ }
+
+ public synchronized PagingStore newStore(final SimpleString address, final AddressSettings settings) throws Exception
+ {
+
+ return new HackPagingStore(address,
+ getPagingManager(),
+ getStorageManager(),
+ getPostOffice(),
+ null,
+ this,
+ address,
+ settings,
+ getExecutorFactory().getExecutor(),
+ syncNonTransactional);
+ }
+
+ };
+
+ HornetQServer server = new HornetQServerImpl(config, ManagementFactory.getPlatformMBeanServer(), securityManager)
+
+ {
+ protected PagingManager createPagingManager()
+ {
+ return new PagingManagerImpl(new HackStoreFactory(config.getPagingDirectory(),
+ getExecutorFactory(),
+ config.isJournalSyncNonTransactional()),
+ getStorageManager(),
+ getAddressSettingsRepository());
+ }
+ };
+
+ AddressSettings defaultSetting = new AddressSettings();
+ defaultSetting.setPageSizeBytes(PAGE_SIZE);
+ defaultSetting.setMaxSizeBytes(PAGE_MAX);
+ server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+
+ server.start();
+
+ final AtomicInteger errors = new AtomicInteger(0);
+
+ final int messageSize = 1024; // 1k
+ final int numberOfMessages = 2000;
+
+ try
+ {
+ final ClientSessionFactory sf = createInVMFactory();
+
+ sf.setBlockOnNonDurableSend(true);
+ sf.setBlockOnDurableSend(true);
+ sf.setBlockOnAcknowledge(false);
+
+ final byte[] body = new byte[messageSize];
+
+ Thread producerThread = new Thread()
+ {
+ public void run()
+ {
+ ClientSession sessionProducer = null;
+ try
+ {
+ sessionProducer = sf.createSession(false, false);
+ ClientProducer producer = sessionProducer.createProducer(ADDRESS);
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = sessionProducer.createMessage(true);
+ msg.getBodyBuffer().writeBytes(body);
+ msg.putIntProperty("count", i);
+ producer.send(msg);
+
+ if (i % 500 == 0 && i != 0)
+ {
+ sessionProducer.commit();
+ // Thread.sleep(500);
+ }
+ }
+
+ sessionProducer.commit();
+
+ System.out.println("Producer gone");
+
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace(); // >> junit report
+ errors.incrementAndGet();
+ }
+ finally
+ {
+ try
+ {
+ if (sessionProducer != null)
+ {
+ sessionProducer.close();
+ }
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+ }
+ }
+ };
+
+ ClientSession session = sf.createSession(true, true, 0);
+ session.start();
+ session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+ producerThread.start();
+
+ ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = consumer.receive(500000);
+ assertNotNull(msg);
+ assertEquals(i, msg.getIntProperty("count").intValue());
+ msg.acknowledge();
+ if (i > 0 && i % 10 == 0)
+ {
+ // session.commit();
+ }
+ }
+ // session.commit();
+
+ session.close();
+
+ producerThread.join();
+
+ assertEquals(0, errors.get());
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
+ public void testOrderingNonTX() throws Exception
+ {
+ clearData();
+
+ Configuration config = createDefaultConfig();
+
+ final HornetQServer server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_SIZE * 2,
+ new HashMap<String, AddressSettings>());
+
+ server.getConfiguration().setJournalSyncNonTransactional(false);
+ server.getConfiguration().setJournalSyncTransactional(false);
+
+ server.start();
+
+ final AtomicInteger errors = new AtomicInteger(0);
+
+ final int messageSize = 1024; // 1k
+ final int numberOfMessages = 2000;
+
+ try
+ {
+ final ClientSessionFactory sf = createInVMFactory();
+
+ sf.setBlockOnNonDurableSend(false);
+ sf.setBlockOnDurableSend(true);
+ sf.setBlockOnAcknowledge(false);
+
+ final CountDownLatch ready = new CountDownLatch(1);
+
+ final byte[] body = new byte[messageSize];
+
+ Thread producerThread = new Thread()
+ {
+ public void run()
+ {
+ ClientSession sessionProducer = null;
+ try
+ {
+ sessionProducer = sf.createSession(true, true);
+ ClientProducer producer = sessionProducer.createProducer(ADDRESS);
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = sessionProducer.createMessage(true);
+ msg.getBodyBuffer().writeBytes(body);
+ msg.putIntProperty("count", i);
+ producer.send(msg);
+
+ if (i == 1000)
+ {
+ // The session is not TX, but we do this just to perform a round trip to the server
+ // and make sure there are no pending messages
+ sessionProducer.commit();
+
+ assertTrue(server.getPagingManager().getPageStore(ADDRESS).isPaging());
+ ready.countDown();
+ }
+ }
+
+ sessionProducer.commit();
+
+ System.out.println("Producer gone");
+
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace(); // >> junit report
+ errors.incrementAndGet();
+ }
+ finally
+ {
+ try
+ {
+ if (sessionProducer != null)
+ {
+ sessionProducer.close();
+ }
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+ }
+ }
+ };
+
+ ClientSession session = sf.createSession(true, true, 0);
+ session.start();
+ session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
+ producerThread.start();
+
+ assertTrue(ready.await(10, TimeUnit.SECONDS));
+
+ ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = consumer.receive(500000);
+ assertNotNull(msg);
+ assertEquals(i, msg.getIntProperty("count").intValue());
+ msg.acknowledge();
+ }
+
+ session.close();
+
+ producerThread.join();
+
+ assertEquals(0, errors.get());
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
public void testPageOnSchedulingNoRestart() throws Exception
{
internalTestPageOnScheduling(false);
14 years, 5 months
JBoss hornetq SVN: r9708 - in branches/hornetq-416: src/main/org/hornetq/core/protocol/core/impl and 4 other directories.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-09-21 10:18:07 -0400 (Tue, 21 Sep 2010)
New Revision: 9708
Added:
branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSConnectionInfo.java
Modified:
branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSServerControl.java
branches/hornetq-416/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
branches/hornetq-416/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java
branches/hornetq-416/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java
branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java
branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-416
* add JMSConnectionInfo and JMSServerControl.listConnectionsAsJSON()
* add RemotingConnection.getCreationTime() to know when a connection has been created
Added: branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSConnectionInfo.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSConnectionInfo.java (rev 0)
+++ branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSConnectionInfo.java 2010-09-21 14:18:07 UTC (rev 9708)
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.api.jms.management;
+
+import org.hornetq.utils.json.JSONArray;
+import org.hornetq.utils.json.JSONObject;
+
+/**
+ * A JMSConnectionInfo
+ *
+ * @author jmesnil
+ *
+ *
+ */
+public class JMSConnectionInfo
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private final String connectionID;
+
+ private final String clientAddress;
+
+ private final long creationTime;
+
+ // TODO
+ // user name
+ // client ID
+
+ // Static --------------------------------------------------------
+
+ public static JMSConnectionInfo[] from(final String jsonString) throws Exception
+ {
+ JSONArray array = new JSONArray(jsonString);
+ JMSConnectionInfo[] infos = new JMSConnectionInfo[array.length()];
+ for (int i = 0; i < array.length(); i++)
+ {
+ JSONObject obj = array.getJSONObject(i);
+ JMSConnectionInfo info = new JMSConnectionInfo(obj.getString("connectionID"),
+ obj.getString("clientAddress"),
+ obj.getLong("creationTime"));
+ infos[i] = info;
+ }
+ return infos;
+ }
+
+ // Constructors --------------------------------------------------
+
+ private JMSConnectionInfo(final String connectionID,
+ final String clientAddress,
+ final long creationTime)
+ {
+ this.connectionID = connectionID;
+ this.clientAddress = clientAddress;
+ this.creationTime = creationTime;
+ }
+
+ // Public --------------------------------------------------------
+
+ public String getConnectionID()
+ {
+ return connectionID;
+ }
+
+ public String getClientAddress()
+ {
+ return clientAddress;
+ }
+
+ public long getCreationTime()
+ {
+ return creationTime;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSServerControl.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSServerControl.java 2010-09-21 13:38:26 UTC (rev 9707)
+++ branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSServerControl.java 2010-09-21 14:18:07 UTC (rev 9708)
@@ -228,6 +228,8 @@
@Operation(desc = "List all the connection IDs", impact = MBeanOperationInfo.INFO)
String[] listConnectionIDs() throws Exception;
+ String listConnectionsAsJSON() throws Exception;
+
/**
* Lists all the sessions IDs for the specified connection ID.
*/
Modified: branches/hornetq-416/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2010-09-21 13:38:26 UTC (rev 9707)
+++ branches/hornetq-416/src/main/org/hornetq/core/protocol/core/impl/RemotingConnectionImpl.java 2010-09-21 14:18:07 UTC (rev 9708)
@@ -89,6 +89,8 @@
private volatile boolean executing;
+ private final long creationTime;
+
// Constructors
// ---------------------------------------------------------------------------------
@@ -129,6 +131,8 @@
this.client = client;
this.executor = executor;
+
+ this.creationTime = System.currentTimeMillis();
}
// RemotingConnection implementation
@@ -160,6 +164,11 @@
{
return transportConnection.getRemoteAddress();
}
+
+ public long getCreationTime()
+ {
+ return creationTime;
+ }
public synchronized Channel getChannel(final long channelID, final int confWindowSize)
{
Modified: branches/hornetq-416/src/main/org/hornetq/core/protocol/stomp/StompConnection.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2010-09-21 13:38:26 UTC (rev 9707)
+++ branches/hornetq-416/src/main/org/hornetq/core/protocol/stomp/StompConnection.java 2010-09-21 14:18:07 UTC (rev 9708)
@@ -49,12 +49,16 @@
private boolean valid;
private boolean destroyed = false;
+
+ private final long creationTime;
StompConnection(final Connection transportConnection, final StompProtocolManager manager)
{
this.transportConnection = transportConnection;
this.manager = manager;
+
+ this.creationTime = System.currentTimeMillis();
}
public void addCloseListener(CloseListener listener)
@@ -117,6 +121,11 @@
{
return transportConnection.getRemoteAddress();
}
+
+ public long getCreationTime()
+ {
+ return creationTime;
+ }
public Connection getTransportConnection()
{
Modified: branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java 2010-09-21 13:38:26 UTC (rev 9707)
+++ branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSServerControlImpl.java 2010-09-21 14:18:07 UTC (rev 9708)
@@ -16,6 +16,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import javax.management.ListenerNotFoundException;
@@ -31,13 +32,16 @@
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.management.ManagementHelper;
-import org.hornetq.api.core.management.Parameter;
import org.hornetq.api.jms.management.ConnectionFactoryControl;
import org.hornetq.api.jms.management.JMSQueueControl;
import org.hornetq.api.jms.management.JMSServerControl;
import org.hornetq.api.jms.management.TopicControl;
import org.hornetq.core.management.impl.MBeanInfoHelper;
+import org.hornetq.core.server.ServerSession;
import org.hornetq.jms.server.JMSServerManager;
+import org.hornetq.spi.core.protocol.RemotingConnection;
+import org.hornetq.utils.json.JSONArray;
+import org.hornetq.utils.json.JSONObject;
/**
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
@@ -567,7 +571,34 @@
blockOnIO();
}
}
+
+ public String listConnectionsAsJSON() throws Exception
+ {
+ checkStarted();
+ clearIO();
+
+ try
+ {
+ JSONArray array = new JSONArray();
+
+ Set<RemotingConnection> connections = server.getHornetQServer().getRemotingService().getConnections();
+ for (RemotingConnection connection : connections)
+ {
+ JSONObject obj = new JSONObject();
+ obj.put("connectionID", connection.getID().toString());
+ obj.put("clientAddress", connection.getRemoteAddress());
+ obj.put("creationTime", connection.getCreationTime());
+ array.put(obj);
+ }
+ return array.toString();
+ }
+ finally
+ {
+ blockOnIO();
+ }
+ }
+
public String[] listSessions(final String connectionID) throws Exception
{
checkStarted();
Modified: branches/hornetq-416/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java 2010-09-21 13:38:26 UTC (rev 9707)
+++ branches/hornetq-416/src/main/org/hornetq/spi/core/protocol/RemotingConnection.java 2010-09-21 14:18:07 UTC (rev 9708)
@@ -38,6 +38,11 @@
Object getID();
/**
+ * Returns the creation time of the Remoting connection
+ */
+ long getCreationTime();
+
+ /**
* returns a string representation of the remote address of this connection
*
* @return the remote address
Modified: branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java
===================================================================
--- branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java 2010-09-21 13:38:26 UTC (rev 9707)
+++ branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControl2Test.java 2010-09-21 14:18:07 UTC (rev 9708)
@@ -25,6 +25,7 @@
import junit.framework.Assert;
import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.jms.management.JMSConnectionInfo;
import org.hornetq.api.jms.management.JMSServerControl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
@@ -139,7 +140,17 @@
{
doListConnectionIDs(NettyAcceptorFactory.class.getName(), NettyConnectorFactory.class.getName());
}
+
+ public void testListConnectionsAsJSONForNetty() throws Exception
+ {
+ doListConnectionsAsJSON(NettyAcceptorFactory.class.getName(), NettyConnectorFactory.class.getName());
+ }
+ public void testListConnectionsAsJSONForInVM() throws Exception
+ {
+ doListConnectionsAsJSON(InVMAcceptorFactory.class.getName(), InVMConnectorFactory.class.getName());
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -207,6 +218,80 @@
}
}
+ private void doListConnectionsAsJSON(final String acceptorFactory, final String connectorFactory) throws Exception
+ {
+ try
+ {
+ startHornetQServer(acceptorFactory);
+
+ JMSServerControl control = createManagementControl();
+
+ long startTime = System.currentTimeMillis();
+
+ String jsonStr = control.listConnectionsAsJSON();
+ assertNotNull(jsonStr);
+ JMSConnectionInfo[] infos = JMSConnectionInfo.from(jsonStr);
+ assertEquals(0, infos.length);
+
+ ConnectionFactory cf1 = JMSUtil.createFactory(connectorFactory,
+ JMSServerControl2Test.CONNECTION_TTL,
+ JMSServerControl2Test.PING_PERIOD);
+ Connection connection = cf1.createConnection();
+
+ jsonStr = control.listConnectionsAsJSON();
+ assertNotNull(jsonStr);
+ infos = JMSConnectionInfo.from(jsonStr);
+ assertEquals(1, infos.length);
+ for (JMSConnectionInfo info : infos)
+ {
+ assertNotNull(info.getConnectionID());
+ assertNotNull(info.getClientAddress());
+ assertTrue(startTime < info.getCreationTime() && info.getCreationTime() < System.currentTimeMillis());
+ }
+
+ ConnectionFactory cf2 = JMSUtil.createFactory(connectorFactory,
+ JMSServerControl2Test.CONNECTION_TTL,
+ JMSServerControl2Test.PING_PERIOD);
+ Connection connection2 = cf2.createConnection();
+
+ jsonStr = control.listConnectionsAsJSON();
+ assertNotNull(jsonStr);
+ infos = JMSConnectionInfo.from(jsonStr);
+ assertEquals(2, infos.length);
+ for (JMSConnectionInfo info : infos)
+ {
+ assertNotNull(info.getConnectionID());
+ assertNotNull(info.getClientAddress());
+ assertTrue(startTime < info.getCreationTime() && info.getCreationTime() < System.currentTimeMillis());
+ }
+
+ connection.close();
+
+ waitForConnectionIDs(1, control);
+
+ connection2.close();
+
+ waitForConnectionIDs(0, control);
+
+ jsonStr = control.listConnectionsAsJSON();
+ assertNotNull(jsonStr);
+ infos = JMSConnectionInfo.from(jsonStr);
+ assertEquals(0, infos.length);
+ }
+ finally
+ {
+ if (serverManager != null)
+ {
+ serverManager.stop();
+ }
+
+ if (server != null)
+ {
+ server.stop();
+ }
+ }
+ }
+
private void waitForConnectionIDs(final int num, final JMSServerControl control) throws Exception
{
final long timeout = 10000;
Modified: branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java
===================================================================
--- branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java 2010-09-21 13:38:26 UTC (rev 9707)
+++ branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/JMSServerControlUsingJMSTest.java 2010-09-21 14:18:07 UTC (rev 9708)
@@ -233,6 +233,11 @@
{
return (String[])proxy.invokeOperation("listConnectionIDs");
}
+
+ public String listConnectionsAsJSON() throws Exception
+ {
+ return (String)proxy.invokeOperation("listConnectionsAsJSON");
+ }
public String[] listRemoteAddresses() throws Exception
{
14 years, 5 months
JBoss hornetq SVN: r9707 - trunk/src/main/org/hornetq/core/postoffice/impl.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-09-21 09:38:26 -0400 (Tue, 21 Sep 2010)
New Revision: 9707
Modified:
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
Log:
https://jira.jboss.org/browse/HORNETQ-521
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-09-21 09:52:36 UTC (rev 9706)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-09-21 13:38:26 UTC (rev 9707)
@@ -483,24 +483,24 @@
if (binding.getType() == BindingType.LOCAL_QUEUE)
{
managementService.unregisterQueue(uniqueName, binding.getAddress());
+
+ TypedProperties props = new TypedProperties();
+
+ props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress());
+
+ props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME, binding.getClusterName());
+
+ props.putSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME, binding.getRoutingName());
+
+ props.putIntProperty(ManagementHelper.HDR_DISTANCE, binding.getDistance());
+
+ managementService.sendNotification(new Notification(null, NotificationType.BINDING_REMOVED, props));
}
else if (binding.getType() == BindingType.DIVERT)
{
managementService.unregisterDivert(uniqueName);
}
- TypedProperties props = new TypedProperties();
-
- props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress());
-
- props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME, binding.getClusterName());
-
- props.putSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME, binding.getRoutingName());
-
- props.putIntProperty(ManagementHelper.HDR_DISTANCE, binding.getDistance());
-
- managementService.sendNotification(new Notification(null, NotificationType.BINDING_REMOVED, props));
-
binding.close();
return binding;
14 years, 5 months
JBoss hornetq SVN: r9706 - trunk/src/main/org/hornetq/core/persistence/impl/journal.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-09-21 05:52:36 -0400 (Tue, 21 Sep 2010)
New Revision: 9706
Modified:
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
Log:
https://jira.jboss.org/browse/HORNETQ-506
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-09-21 09:31:13 UTC (rev 9705)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-09-21 09:52:36 UTC (rev 9706)
@@ -168,11 +168,10 @@
private final String journalDir;
private final String largeMessagesDirectory;
-
-
+
// Persisted core configuration
private final Map<SimpleString, PersistedRoles> mapPersistedRoles = new ConcurrentHashMap<SimpleString, PersistedRoles>();
-
+
private final Map<SimpleString, PersistedAddressSetting> mapPersistedAddressSettings = new ConcurrentHashMap<SimpleString, PersistedAddressSetting>();
public JournalStorageManager(final Configuration config, final ExecutorFactory executorFactory)
@@ -585,7 +584,11 @@
public void updatePageTransaction(final long txID, final PageTransactionInfo pageTransaction, final int depages) throws Exception
{
- messageJournal.appendUpdateRecordTransactional(txID, pageTransaction.getRecordID(), JournalStorageManager.PAGE_TRANSACTION, new PageUpdateTXEncoding(pageTransaction.getTransactionID(), depages));
+ messageJournal.appendUpdateRecordTransactional(txID,
+ pageTransaction.getRecordID(),
+ JournalStorageManager.PAGE_TRANSACTION,
+ new PageUpdateTXEncoding(pageTransaction.getTransactionID(),
+ depages));
}
public void storeReferenceTransactional(final long txID, final long queueID, final long messageID) throws Exception
@@ -697,8 +700,7 @@
getContext(syncNonTransactional));
}
-
-
+
public void storeAddressSetting(PersistedAddressSetting addressSetting) throws Exception
{
deleteAddressSetting(addressSetting.getAddressMatch());
@@ -707,14 +709,13 @@
bindingsJournal.appendAddRecord(id, ADDRESS_SETTING_RECORD, addressSetting, true);
mapPersistedAddressSettings.put(addressSetting.getAddressMatch(), addressSetting);
}
-
+
public List<PersistedAddressSetting> recoverAddressSettings() throws Exception
{
ArrayList<PersistedAddressSetting> list = new ArrayList<PersistedAddressSetting>(mapPersistedAddressSettings.size());
list.addAll(mapPersistedAddressSettings.values());
return list;
}
-
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#recoverPersistedRoles()
@@ -746,9 +747,9 @@
{
bindingsJournal.appendDeleteRecord(oldSetting.getStoreId(), false);
}
-
+
}
-
+
public void deleteSecurityRoles(SimpleString addressMatch) throws Exception
{
PersistedRoles oldRoles = mapPersistedRoles.remove(addressMatch);
@@ -758,8 +759,6 @@
}
}
-
-
public JournalLoadInformation loadMessageJournal(final PostOffice postOffice,
final PagingManager pagingManager,
final ResourceManager resourceManager,
@@ -771,7 +770,7 @@
List<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
Map<Long, ServerMessage> messages = new HashMap<Long, ServerMessage>();
-
+
JournalLoadInformation info = messageJournal.load(records,
preparedTransactions,
new LargeMessageTXFailureCallback(messages));
@@ -781,17 +780,17 @@
Map<Long, Map<Long, AddMessageRecord>> queueMap = new HashMap<Long, Map<Long, AddMessageRecord>>();
final int totalSize = records.size();
-
- for (int reccount = 0 ; reccount < totalSize; reccount++)
+
+ for (int reccount = 0; reccount < totalSize; reccount++)
{
// It will show log.info only with large journals (more than 1 million records)
- if (reccount> 0 && reccount % 1000000 == 0)
+ if (reccount > 0 && reccount % 1000000 == 0)
{
long percent = (long)((((double)reccount) / ((double)totalSize)) * 100f);
-
+
log.info(percent + "% loaded");
}
-
+
RecordInfo record = records.get(reccount);
byte[] data = record.data;
@@ -885,15 +884,15 @@
if (queueMessages == null)
{
- log.warn("Cannot find queue " + encoding.queueID + " to update delivery count");
+ log.warn("Cannot find queue " + encoding.queueID + " to update delivery count");
}
else
{
AddMessageRecord rec = queueMessages.get(messageID);
-
+
if (rec == null)
{
- log.warn("Cannot find message " + messageID + " to update delivery count");
+ log.warn("Cannot find message " + messageID + " to update delivery count");
}
else
{
@@ -908,21 +907,21 @@
if (record.isUpdate)
{
PageUpdateTXEncoding pageUpdate = new PageUpdateTXEncoding();
-
+
pageUpdate.decode(buff);
-
+
PageTransactionInfo pageTX = pagingManager.getTransaction(pageUpdate.pageTX);
-
+
pageTX.update(pageUpdate.recods, null, null);
}
else
{
PageTransactionInfoImpl pageTransactionInfo = new PageTransactionInfoImpl();
-
+
pageTransactionInfo.decode(buff);
-
+
pageTransactionInfo.setRecordID(record.id);
-
+
pagingManager.addTransaction(pageTransactionInfo);
}
@@ -985,13 +984,13 @@
throw new IllegalStateException("Invalid record type " + recordType);
}
}
-
+
// This will free up memory sooner. The record is not needed any more
// and its byte array would consume memory during the load process even though it's not necessary any longer
// what would delay processing time during load
records.set(reccount, null);
}
-
+
// Release the memory as soon as not needed any longer
records.clear();
records = null;
@@ -1003,7 +1002,14 @@
Map<Long, AddMessageRecord> queueRecords = entry.getValue();
Queue queue = queues.get(queueID);
-
+
+ if (queue == null)
+ {
+ log.warn("Message for queue " + queueID + " which does not exist. This message will be ignored.");
+
+ continue;
+ }
+
Collection<AddMessageRecord> valueRecords = queueRecords.values();
for (AddMessageRecord record : valueRecords)
@@ -1037,7 +1043,7 @@
msg.decrementDelayDeletionCount();
}
}
-
+
for (ServerMessage msg : messages.values())
{
if (msg.getRefCount() == 0)
@@ -1053,7 +1059,7 @@
}
}
}
-
+
if (perfBlastPages != -1)
{
messageJournal.perfBlast(perfBlastPages);
@@ -1418,18 +1424,22 @@
if (queue == null)
{
- throw new IllegalStateException("Cannot find queue with id " + encoding.queueID);
+ log.warn("Message in prepared tx for queue " + encoding.queueID +
+ " which does not exist. This message will be ignored.");
+
}
+ else
+ {
+ ServerMessage message = messages.get(messageID);
- ServerMessage message = messages.get(messageID);
+ if (message == null)
+ {
+ throw new IllegalStateException("Cannot find message with id " + messageID);
+ }
- if (message == null)
- {
- throw new IllegalStateException("Cannot find message with id " + messageID);
+ postOffice.reroute(message, queue, tx);
}
- postOffice.reroute(message, queue, tx);
-
break;
}
case ACKNOWLEDGE_REF:
@@ -1446,7 +1456,7 @@
{
throw new IllegalStateException("Cannot find queue with id " + encoding.queueID);
}
-
+
// TODO - this involves a scan - we should find a quicker way of doing it
MessageReference removed = queue.removeReferenceWithID(messageID);
@@ -2017,24 +2027,24 @@
super(queueID);
}
}
-
+
private static class PageUpdateTXEncoding implements EncodingSupport
{
-
+
public long pageTX;
-
+
public int recods;
-
+
public PageUpdateTXEncoding()
{
}
-
+
public PageUpdateTXEncoding(final long pageTX, final int records)
{
this.pageTX = pageTX;
this.recods = records;
}
-
+
public void decode(HornetQBuffer buffer)
{
this.pageTX = buffer.readLong();
@@ -2057,7 +2067,7 @@
{
return DataConstants.SIZE_LONG + DataConstants.SIZE_INT;
}
-
+
}
private static class ScheduledDeliveryEncoding extends QueueEncoding
@@ -2182,6 +2192,7 @@
}
}
+
private static final class AddMessageRecord
{
public AddMessageRecord(final ServerMessage message)
@@ -2194,7 +2205,7 @@
long scheduledDeliveryTime;
int deliveryCount;
-
+
boolean referenced = false;
}
14 years, 5 months
JBoss hornetq SVN: r9705 - in branches/hornetq-416/src/main/org/hornetq: jms/management/impl and 1 other directory.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-09-21 05:31:13 -0400 (Tue, 21 Sep 2010)
New Revision: 9705
Modified:
branches/hornetq-416/src/main/org/hornetq/api/jms/management/SubscriptionInfo.java
branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java
Log:
https://jira.jboss.org/browse/HORNETQ-416
* add deliveringCount to the SubscriptionInfo returned by JMSTopicControl.listXXXSubscriptions() methods
Modified: branches/hornetq-416/src/main/org/hornetq/api/jms/management/SubscriptionInfo.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/api/jms/management/SubscriptionInfo.java 2010-09-21 09:22:42 UTC (rev 9704)
+++ branches/hornetq-416/src/main/org/hornetq/api/jms/management/SubscriptionInfo.java 2010-09-21 09:31:13 UTC (rev 9705)
@@ -40,6 +40,8 @@
private final int messageCount;
+ private final int deliveringCount;
+
// Static --------------------------------------------------------
/**
@@ -58,7 +60,8 @@
sub.optString("name", null),
sub.getBoolean("durable"),
sub.optString("selector", null),
- sub.getInt("messageCount"));
+ sub.getInt("messageCount"),
+ sub.getInt("deliveringCount"));
infos[i] = info;
}
@@ -72,7 +75,8 @@
final String name,
final boolean durable,
final String selector,
- final int messageCount)
+ final int messageCount,
+ final int deliveringCount)
{
this.queueName = queueName;
this.clientID = clientID;
@@ -80,6 +84,7 @@
this.durable = durable;
this.selector = selector;
this.messageCount = messageCount;
+ this.deliveringCount = deliveringCount;
}
// Public --------------------------------------------------------
@@ -131,6 +136,14 @@
{
return messageCount;
}
+
+ /**
+ * Returns the number of messages currently delivered to this subscription.
+ */
+ public int getDeliveringCount()
+ {
+ return deliveringCount;
+ }
// Package protected ---------------------------------------------
Modified: branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java 2010-09-21 09:22:42 UTC (rev 9704)
+++ branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java 2010-09-21 09:31:13 UTC (rev 9705)
@@ -337,6 +337,7 @@
info.put("name", subName);
info.put("durable", queue.isDurable());
info.put("messageCount", queue.getMessageCount());
+ info.put("deliveringCount", queue.getDeliveringCount());
array.put(info);
}
14 years, 5 months
JBoss hornetq SVN: r9704 - in trunk: tests/src/org/hornetq/tests/unit/core/deployers/impl and 1 other directory.
by do-not-reply@jboss.org
Author: timfox
Date: 2010-09-21 05:22:42 -0400 (Tue, 21 Sep 2010)
New Revision: 9704
Modified:
trunk/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/SecurityDeployerTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-501
Modified: trunk/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
===================================================================
--- trunk/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2010-09-21 09:09:50 UTC (rev 9703)
+++ trunk/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2010-09-21 09:22:42 UTC (rev 9704)
@@ -686,31 +686,31 @@
}
else if (FileConfigurationParser.CREATEDURABLEQUEUE_NAME.equals(type))
{
- createDurableQueue.add(role);
+ createDurableQueue.add(role.trim());
}
else if (FileConfigurationParser.DELETEDURABLEQUEUE_NAME.equals(type))
{
- deleteDurableQueue.add(role);
+ deleteDurableQueue.add(role.trim());
}
else if (FileConfigurationParser.CREATE_NON_DURABLE_QUEUE_NAME.equals(type))
{
- createNonDurableQueue.add(role);
+ createNonDurableQueue.add(role.trim());
}
else if (FileConfigurationParser.DELETE_NON_DURABLE_QUEUE_NAME.equals(type))
{
- deleteNonDurableQueue.add(role);
+ deleteNonDurableQueue.add(role.trim());
}
else if (FileConfigurationParser.CREATETEMPQUEUE_NAME.equals(type))
{
- createNonDurableQueue.add(role);
+ createNonDurableQueue.add(role.trim());
}
else if (FileConfigurationParser.DELETETEMPQUEUE_NAME.equals(type))
{
- deleteNonDurableQueue.add(role);
+ deleteNonDurableQueue.add(role.trim());
}
else if (FileConfigurationParser.MANAGE_NAME.equals(type))
{
- manageRoles.add(role);
+ manageRoles.add(role.trim());
}
if (!allRoles.contains(role.trim()))
{
Modified: trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/SecurityDeployerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/SecurityDeployerTest.java 2010-09-21 09:09:50 UTC (rev 9703)
+++ trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/SecurityDeployerTest.java 2010-09-21 09:22:42 UTC (rev 9704)
@@ -41,7 +41,31 @@
+ " <permission type=\"manage\" roles=\"guest,publisher,durpublisher\"/>\n"
+ " </security-setting>";
- private final String conf2 = "<security-setting match=\"jms.topic.testQueue\">\n" + " <permission type=\"createNonDurableQueue\" roles=\"durpublisher\"/>\n"
+ private final String confWithWhiteSpace1 = "<security-setting match=\"jms.topic.testTopic\">\n" +
+ " <permission type=\"createDurableQueue\" roles=\"guest, publisher, durpublisher\"/>\n" +
+ "<permission type=\"createNonDurableQueue\" roles=\"guest, publisher, durpublisher\"/>\n"
+ + " <permission type=\"deleteNonDurableQueue\" roles=\"guest, publisher, durpublisher\"/>\n"
+ + " <permission type=\"deleteDurableQueue\" roles=\"guest, publisher, durpublisher\"/>\n"
+
+ + " <permission type=\"consume\" roles=\"guest, publisher, durpublisher\"/>\n"
+ + " <permission type=\"send\" roles=\"guest, publisher, durpublisher\"/>\n"
+ + " <permission type=\"manage\" roles=\"guest, publisher, durpublisher\"/>\n"
+ + " <permission type=\"manage\" roles=\"guest, publisher, durpublisher\"/>\n"
+ + " </security-setting>";
+
+ private final String confWithWhiteSpace2 = "<security-setting match=\"jms.topic.testTopic\">\n" +
+ " <permission type=\"createDurableQueue\" roles=\"guest , publisher , durpublisher\"/>\n" +
+ "<permission type=\"createNonDurableQueue\" roles=\"guest , publisher , durpublisher\"/>\n"
+ + " <permission type=\"deleteNonDurableQueue\" roles=\"guest , publisher , durpublisher\"/>\n"
+ + " <permission type=\"deleteDurableQueue\" roles=\"guest , publisher , durpublisher\"/>\n"
+
+ + " <permission type=\"consume\" roles=\"guest , publisher , durpublisher\"/>\n"
+ + " <permission type=\"send\" roles=\"guest , publisher , durpublisher\"/>\n"
+ + " <permission type=\"manage\" roles=\"guest , publisher , durpublisher\"/>\n"
+ + " </security-setting>";
+
+ private final String conf2 = "<security-setting match=\"jms.topic.testQueue\">\n" +
+ " <permission type=\"createNonDurableQueue\" roles=\"durpublisher\"/>\n"
+ " <permission type=\"deleteNonDurableQueue\" roles=\"durpublisher\"/>\n"
+ " <permission type=\"consume\" roles=\"guest,publisher,durpublisher\"/>\n"
+ " <permission type=\"send\" roles=\"guest,publisher,durpublisher\"/>\n"
@@ -106,7 +130,63 @@
}
}
}
+
+ public void testWithWhiteSpace1() throws Exception
+ {
+ testWithWhiteSpace(confWithWhiteSpace1);
+ }
+
+ public void testWithWhiteSpace2() throws Exception
+ {
+ testWithWhiteSpace(confWithWhiteSpace2);
+ }
+ private void testWithWhiteSpace(String conf) throws Exception
+ {
+ Element e = org.hornetq.utils.XMLUtil.stringToElement(confWithWhiteSpace1);
+ deployer.deploy(e);
+ HashSet<Role> roles = (HashSet<Role>)repository.getMatch("jms.topic.testTopic");
+ Assert.assertNotNull(roles);
+ Assert.assertEquals(3, roles.size());
+ for (Role role : roles)
+ {
+ if (role.getName().equals("guest"))
+ {
+ Assert.assertTrue(role.isConsume());
+ Assert.assertTrue(role.isCreateDurableQueue());
+ Assert.assertTrue(role.isCreateNonDurableQueue());
+ Assert.assertTrue(role.isDeleteDurableQueue());
+ Assert.assertTrue(role.isDeleteNonDurableQueue());
+ Assert.assertTrue(role.isManage());
+ Assert.assertTrue(role.isSend());
+ }
+ else if (role.getName().equals("publisher"))
+ {
+ Assert.assertTrue(role.isConsume());
+ Assert.assertTrue(role.isCreateDurableQueue());
+ Assert.assertTrue(role.isCreateNonDurableQueue());
+ Assert.assertTrue(role.isDeleteDurableQueue());
+ Assert.assertTrue(role.isDeleteNonDurableQueue());
+ Assert.assertTrue(role.isManage());
+ Assert.assertTrue(role.isSend());
+ }
+ else if (role.getName().equals("durpublisher"))
+ {
+ Assert.assertTrue(role.isConsume());
+ Assert.assertTrue(role.isCreateDurableQueue());
+ Assert.assertTrue(role.isCreateNonDurableQueue());
+ Assert.assertTrue(role.isDeleteDurableQueue());
+ Assert.assertTrue(role.isDeleteNonDurableQueue());
+ Assert.assertTrue(role.isManage());
+ Assert.assertTrue(role.isSend());
+ }
+ else
+ {
+ Assert.fail("unexpected role");
+ }
+ }
+ }
+
public void testMultiple() throws Exception
{
deployer.deploy(org.hornetq.utils.XMLUtil.stringToElement(conf));
14 years, 5 months
JBoss hornetq SVN: r9703 - in branches/hornetq-416: src/main/org/hornetq/jms/management/impl and 1 other directories.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-09-21 05:09:50 -0400 (Tue, 21 Sep 2010)
New Revision: 9703
Modified:
branches/hornetq-416/src/main/org/hornetq/api/jms/management/DestinationControl.java
branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSQueueControl.java
branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java
branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java
branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java
branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlUsingJMSTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-416
* add messagesAdded and deliveringCount attribute to JMS DestinationControl
Modified: branches/hornetq-416/src/main/org/hornetq/api/jms/management/DestinationControl.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/api/jms/management/DestinationControl.java 2010-09-21 08:53:23 UTC (rev 9702)
+++ branches/hornetq-416/src/main/org/hornetq/api/jms/management/DestinationControl.java 2010-09-21 09:09:50 UTC (rev 9703)
@@ -47,6 +47,16 @@
*/
int getMessageCount() throws Exception;
+ /**
+ * Returns the number of messages that this queue is currently delivering to its consumers.
+ */
+ int getDeliveringCount();
+
+ /**
+ * Returns the number of messages added to this queue since it was created.
+ */
+ long getMessagesAdded();
+
// Operations ----------------------------------------------------
/**
Modified: branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSQueueControl.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSQueueControl.java 2010-09-21 08:53:23 UTC (rev 9702)
+++ branches/hornetq-416/src/main/org/hornetq/api/jms/management/JMSQueueControl.java 2010-09-21 09:09:50 UTC (rev 9703)
@@ -51,11 +51,6 @@
void setDeadLetterAddress(@Parameter(name = "deadLetterAddress", desc = "Dead-letter address of the queue") String deadLetterAddress) throws Exception;
/**
- * Returns the number of messages added to this queue since it was created.
- */
- long getMessagesAdded();
-
- /**
* Returns the number of scheduled messages in this queue.
*/
long getScheduledCount();
@@ -66,11 +61,6 @@
int getConsumerCount();
/**
- * Returns the number of messages that this queue is currently delivering to its consumers.
- */
- int getDeliveringCount();
-
- /**
* returns the selector for the queue
*/
String getSelector();
Modified: branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java
===================================================================
--- branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java 2010-09-21 08:53:23 UTC (rev 9702)
+++ branches/hornetq-416/src/main/org/hornetq/jms/management/impl/JMSTopicControlImpl.java 2010-09-21 09:09:50 UTC (rev 9703)
@@ -118,6 +118,28 @@
return getMessageCount(DurabilityType.ALL);
}
+ public int getDeliveringCount()
+ {
+ List<QueueControl> queues = getQueues(DurabilityType.ALL);
+ int count = 0;
+ for (QueueControl queue : queues)
+ {
+ count += queue.getDeliveringCount();
+ }
+ return count;
+ }
+
+ public long getMessagesAdded()
+ {
+ List<QueueControl> queues = getQueues(DurabilityType.ALL);
+ int count = 0;
+ for (QueueControl queue : queues)
+ {
+ count += queue.getMessagesAdded();
+ }
+ return count;
+ }
+
public int getDurableMessageCount()
{
return getMessageCount(DurabilityType.DURABLE);
Modified: branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java
===================================================================
--- branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java 2010-09-21 08:53:23 UTC (rev 9702)
+++ branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/JMSUtil.java 2010-09-21 09:09:50 UTC (rev 9703)
@@ -77,26 +77,36 @@
}
static MessageConsumer createConsumer(final Connection connection,
+ final Destination destination) throws JMSException
+ {
+ return createConsumer(connection, destination, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ static MessageConsumer createConsumer(final Connection connection,
final Destination destination,
- final String connectorFactory) throws JMSException
+ int ackMode) throws JMSException
{
- Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session s = connection.createSession(false, ackMode);
return s.createConsumer(destination);
}
- public static MessageConsumer createConsumer(final Connection connection, final Destination destination) throws JMSException
+ static TopicSubscriber createDurableSubscriber(final Connection connection,
+ final Topic topic,
+ final String clientID,
+ final String subscriptionName) throws JMSException
{
- return JMSUtil.createConsumer(connection, destination, InVMConnectorFactory.class.getName());
+ return createDurableSubscriber(connection, topic, clientID, subscriptionName, Session.AUTO_ACKNOWLEDGE);
}
-
+
static TopicSubscriber createDurableSubscriber(final Connection connection,
final Topic topic,
final String clientID,
- final String subscriptionName) throws JMSException
+ final String subscriptionName,
+ final int ackMode) throws JMSException
{
connection.setClientID(clientID);
- Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session s = connection.createSession(false, ackMode);
return s.createDurableSubscriber(topic, subscriptionName);
}
Modified: branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java
===================================================================
--- branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java 2010-09-21 08:53:23 UTC (rev 9702)
+++ branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlTest.java 2010-09-21 09:09:50 UTC (rev 9703)
@@ -13,15 +13,22 @@
package org.hornetq.tests.integration.jms.server.management;
+import static junit.framework.Assert.assertEquals;
+
import java.util.Map;
import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TopicSubscriber;
import junit.framework.Assert;
+import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.TransportConfiguration;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.management.QueueControl;
import org.hornetq.api.jms.HornetQJMSClient;
import org.hornetq.api.jms.management.SubscriptionInfo;
import org.hornetq.api.jms.management.TopicControl;
@@ -406,7 +413,77 @@
{
}
}
+
+ public void testGetMessagesAdded() throws Exception
+ {
+ Connection connection_1 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+ JMSUtil.createConsumer(connection_1, topic);
+ Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+ JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName);
+ Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+ JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName + "2");
+ TopicControl topicControl = createManagementControl();
+
+ Assert.assertEquals(0, topicControl.getMessagesAdded());
+
+ JMSUtil.sendMessages(topic, 2);
+
+ Assert.assertEquals(3 * 2, topicControl.getMessagesAdded());
+
+ connection_1.close();
+ connection_2.close();
+ connection_3.close();
+ }
+
+ public void testGetMessagesDelivering() throws Exception
+ {
+ Connection connection_1 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+ MessageConsumer cons_1 = JMSUtil.createConsumer(connection_1, topic, Session.CLIENT_ACKNOWLEDGE);
+ Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+ MessageConsumer cons_2 = JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName, Session.CLIENT_ACKNOWLEDGE);
+ Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+ MessageConsumer cons_3 = JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName + "2", Session.CLIENT_ACKNOWLEDGE);
+
+ TopicControl topicControl = createManagementControl();
+
+ assertEquals(0, topicControl.getDeliveringCount());
+
+ JMSUtil.sendMessages(topic, 2);
+
+ assertEquals(0, topicControl.getDeliveringCount());
+
+ connection_1.start();
+ connection_2.start();
+ connection_3.start();
+
+ Message msg_1 = null;
+ Message msg_2 = null;
+ Message msg_3 = null;
+ for (int i = 0; i < 2; i++)
+ {
+ msg_1 = cons_1.receive(5000);
+ assertNotNull(msg_1);
+ msg_2 = cons_2.receive(5000);
+ assertNotNull(msg_2);
+ msg_3 = cons_3.receive(5000);
+ assertNotNull(msg_3);
+ }
+
+ assertEquals(3 * 2, topicControl.getDeliveringCount());
+
+ msg_1.acknowledge();
+ assertEquals(2 * 2, topicControl.getDeliveringCount());
+ msg_2.acknowledge();
+ assertEquals(1 * 2, topicControl.getDeliveringCount());
+ msg_3.acknowledge();
+ assertEquals(0, topicControl.getDeliveringCount());
+
+ connection_1.close();
+ connection_2.close();
+ connection_3.close();
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlUsingJMSTest.java
===================================================================
--- branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlUsingJMSTest.java 2010-09-21 08:53:23 UTC (rev 9702)
+++ branches/hornetq-416/tests/src/org/hornetq/tests/integration/jms/server/management/TopicControlUsingJMSTest.java 2010-09-21 09:09:50 UTC (rev 9703)
@@ -13,9 +13,12 @@
package org.hornetq.tests.integration.jms.server.management;
+import static junit.framework.Assert.assertEquals;
import static org.hornetq.tests.util.RandomUtil.randomString;
import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import javax.jms.Session;
@@ -26,6 +29,7 @@
import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.api.core.management.ResourceNames;
import org.hornetq.api.jms.HornetQJMSClient;
+import org.hornetq.api.jms.management.TopicControl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
@@ -333,6 +337,71 @@
}
}
+ public void testGetMessagesAdded() throws Exception
+ {
+ Connection connection_1 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+ JMSUtil.createConsumer(connection_1, topic);
+ Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+ JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName);
+ Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+ JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName + "2");
+
+ assertEquals(0, proxy.retrieveAttributeValue("messagesAdded"));
+
+ JMSUtil.sendMessages(topic, 2);
+
+ assertEquals(3 * 2, proxy.retrieveAttributeValue("messagesAdded"));
+
+ connection_1.close();
+ connection_2.close();
+ connection_3.close();
+ }
+
+ public void testGetMessagesDelivering() throws Exception
+ {
+ Connection connection_1 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+ MessageConsumer cons_1 = JMSUtil.createConsumer(connection_1, topic, Session.CLIENT_ACKNOWLEDGE);
+ Connection connection_2 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+ MessageConsumer cons_2 = JMSUtil.createDurableSubscriber(connection_2, topic, clientID, subscriptionName, Session.CLIENT_ACKNOWLEDGE);
+ Connection connection_3 = JMSUtil.createConnection(InVMConnectorFactory.class.getName());
+ MessageConsumer cons_3 = JMSUtil.createDurableSubscriber(connection_3, topic, clientID, subscriptionName + "2", Session.CLIENT_ACKNOWLEDGE);
+
+ assertEquals(0, proxy.retrieveAttributeValue("deliveringCount"));
+
+ JMSUtil.sendMessages(topic, 2);
+
+ assertEquals(0, proxy.retrieveAttributeValue("deliveringCount"));
+
+ connection_1.start();
+ connection_2.start();
+ connection_3.start();
+
+ Message msg_1 = null;
+ Message msg_2 = null;
+ Message msg_3 = null;
+ for (int i = 0; i < 2; i++)
+ {
+ msg_1 = cons_1.receive(5000);
+ assertNotNull(msg_1);
+ msg_2 = cons_2.receive(5000);
+ assertNotNull(msg_2);
+ msg_3 = cons_3.receive(5000);
+ assertNotNull(msg_3);
+ }
+
+ assertEquals(3 * 2, proxy.retrieveAttributeValue("deliveringCount"));
+
+ msg_1.acknowledge();
+ assertEquals(2 * 2, proxy.retrieveAttributeValue("deliveringCount"));
+ msg_2.acknowledge();
+ assertEquals(1 * 2, proxy.retrieveAttributeValue("deliveringCount"));
+ msg_3.acknowledge();
+ assertEquals(0, proxy.retrieveAttributeValue("deliveringCount"));
+
+ connection_1.close();
+ connection_2.close();
+ connection_3.close();
+ }
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
14 years, 5 months