JBoss hornetq SVN: r8667 - in trunk/src/main/org/hornetq: core/management and 1 other directories.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-12-10 11:19:49 -0500 (Thu, 10 Dec 2009)
New Revision: 8667
Modified:
trunk/src/main/org/hornetq/core/client/ConnectionLoadBalancingPolicy.java
trunk/src/main/org/hornetq/core/management/BridgeControl.java
trunk/src/main/org/hornetq/core/management/DivertControl.java
trunk/src/main/org/hornetq/core/management/HornetQServerControl.java
trunk/src/main/org/hornetq/core/management/QueueControl.java
trunk/src/main/org/hornetq/utils/SimpleString.java
Log:
HORNETQ-185 + HORNETQ-186: API review + javadoc
* added javadoc for ConnectionLoadBalancingPolicy API
* fixed javadoc warnings
Modified: trunk/src/main/org/hornetq/core/client/ConnectionLoadBalancingPolicy.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/ConnectionLoadBalancingPolicy.java 2009-12-10 16:10:36 UTC (rev 8666)
+++ trunk/src/main/org/hornetq/core/client/ConnectionLoadBalancingPolicy.java 2009-12-10 16:19:49 UTC (rev 8667)
@@ -14,15 +14,16 @@
package org.hornetq.core.client;
/**
- * A ConnectionLoadBalancingPolicy
+ * A ConnectionLoadBalancingPolicy defines a policy to load balance between connections.
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
- * Created 28 Nov 2008 10:19:56
- *
- *
*/
public interface ConnectionLoadBalancingPolicy
{
+ /**
+ * Returns the selected index according to the policy implementation.
+ *
+ * @param max maximum position index that can be selected
+ */
int select(int max);
}
Modified: trunk/src/main/org/hornetq/core/management/BridgeControl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/BridgeControl.java 2009-12-10 16:10:36 UTC (rev 8666)
+++ trunk/src/main/org/hornetq/core/management/BridgeControl.java 2009-12-10 16:19:49 UTC (rev 8667)
@@ -43,9 +43,7 @@
String getFilterString();
/**
- * Return the name of the Transformer implementation associated to this bridge.
- *
- * @see org.hornetq.core.server.cluster.Transformer
+ * Return the name of the org.hornetq.core.server.cluster.Transformer implementation associated to this bridge.
*/
String getTransformerClassName();
Modified: trunk/src/main/org/hornetq/core/management/DivertControl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/DivertControl.java 2009-12-10 16:10:36 UTC (rev 8666)
+++ trunk/src/main/org/hornetq/core/management/DivertControl.java 2009-12-10 16:19:49 UTC (rev 8667)
@@ -54,9 +54,7 @@
String getForwardingAddress();
/**
- * Return the name of the Transformer implementation associated to this bridge.
- *
- * @see org.hornetq.core.server.cluster.Transformer
+ * Return the name of the org.hornetq.core.server.cluster.Transformer implementation associated to this bridge.
*/
String getTransformerClassName();
}
Modified: trunk/src/main/org/hornetq/core/management/HornetQServerControl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/HornetQServerControl.java 2009-12-10 16:10:36 UTC (rev 8666)
+++ trunk/src/main/org/hornetq/core/management/HornetQServerControl.java 2009-12-10 16:19:49 UTC (rev 8667)
@@ -169,7 +169,7 @@
/**
* Sets the maximum number of days kept in memory for message counter.
*
- * @count value must be greater than 0
+ * @param count value must be greater than 0
*/
void setMessageCounterMaxDayCount(int count) throws Exception;
@@ -401,7 +401,8 @@
* The Strings are Base-64 representation of the transaction XID and can be
* used to heuristically commit or rollback the transactions.
*
- * @see #commitPreparedTransaction(String), {@link #rollbackPreparedTransaction(String)}
+ * @see #commitPreparedTransaction(String)
+ * @see #rollbackPreparedTransaction(String)
*/
@Operation(desc = "List all the prepared transaction, sorted by date, oldest first")
String[] listPreparedTransactions() throws Exception;
Modified: trunk/src/main/org/hornetq/core/management/QueueControl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/QueueControl.java 2009-12-10 16:10:36 UTC (rev 8666)
+++ trunk/src/main/org/hornetq/core/management/QueueControl.java 2009-12-10 16:19:49 UTC (rev 8667)
@@ -218,7 +218,7 @@
/**
* Changes the message's priority corresponding to the specified message ID to the specified priority.
*
- * @param priority between 0 and 9 inclusive.
+ * @param newPriority between 0 and 9 inclusive.
*
* @return {@code true}�if the message priority was changed
*/
Modified: trunk/src/main/org/hornetq/utils/SimpleString.java
===================================================================
--- trunk/src/main/org/hornetq/utils/SimpleString.java 2009-12-10 16:10:36 UTC (rev 8666)
+++ trunk/src/main/org/hornetq/utils/SimpleString.java 2009-12-10 16:19:49 UTC (rev 8667)
@@ -261,7 +261,6 @@
*
* i.e. "a.b" would return "a" and "b" if . was the delimeter
* @param delim
- * @return
*/
public SimpleString[] split(final char delim)
{
@@ -341,7 +340,7 @@
/**
* concatanates a SimpleString and a char
*
- * @param toAdd the char to concate with.
+ * @param c the char to concate with.
* @return the concatanated SimpleString
*/
public SimpleString concat(final char c)
15 years, 3 months
JBoss hornetq SVN: r8666 - trunk/src/main/org/hornetq/core/management.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-12-10 11:10:36 -0500 (Thu, 10 Dec 2009)
New Revision: 8666
Modified:
trunk/src/main/org/hornetq/core/management/QueueControl.java
Log:
HORNETQ-185 + HORNETQ-186: API review + javadoc
* added javadoc for QueueControl API
Modified: trunk/src/main/org/hornetq/core/management/QueueControl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/QueueControl.java 2009-12-10 15:52:25 UTC (rev 8665)
+++ trunk/src/main/org/hornetq/core/management/QueueControl.java 2009-12-10 16:10:36 UTC (rev 8666)
@@ -21,117 +21,265 @@
import org.hornetq.core.server.management.Parameter;
/**
+ * A QueueControl is used to manage a queue.
+ *
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
- *
- * @version <tt>$Revision$</tt>
- *
*/
public interface QueueControl
{
// Attributes ----------------------------------------------------
+ /**
+ * Returns the name of this queue.
+ */
String getName();
+ /**
+ * Returns the address this queue is bound to.
+ */
String getAddress();
+ /**
+ * Returns this queue ID.
+ */
long getID();
+ /**
+ * Returns whether this queue is temporary.
+ */
boolean isTemporary();
+ /**
+ * Returns whether this queue is durable.
+ */
boolean isDurable();
+ /**
+ * Returns the filter associated to this queue.
+ */
String getFilter();
+ /**
+ * Returns the number of messages currently in this queue.
+ */
int getMessageCount();
+ /**
+ * Returns the number of scheduled messages in this queue.
+ */
long getScheduledCount();
+
+ /**
+ * Returns the number of consumers consuming messages from this queue.
+ */
int getConsumerCount();
+ /**
+ * Returns the number of messages that this queue is currently delivering to its consumers.
+ */
int getDeliveringCount();
+ /**
+ * Returns the number of messages added to this queue since it was created.
+ */
int getMessagesAdded();
+ /**
+ * Returns the expiry address associated to this queue.
+ */
String getExpiryAddress();
+ /**
+ * Sets the expiry address associated to this queue to the specified expiryAddress.
+ */
void setExpiryAddress(@Parameter(name = "expiryAddress", desc = "Expiry address of the queue") String expiryAddres) throws Exception;
+ /**
+ * Returns the dead-letter address associated to this queue.
+ */
String getDeadLetterAddress();
+ /**
+ * Sets the dead-letter address associated to this queue to the specified deadLetterAddress.
+ */
void setDeadLetterAddress(@Parameter(name = "deadLetterAddress", desc = "Dead-letter address of the queue") String deadLetterAddress) throws Exception;
// Operations ----------------------------------------------------
+ /**
+ * Lists all the messages scheduled for delivery for this queue.
+ * <br>
+ * 1 Map represents 1 message, keys are the message's properties and headers, values are the corresponding values.
+ */
@Operation(desc = "List the messages scheduled for delivery", impact = MBeanOperationInfo.INFO)
Map<String, Object>[] listScheduledMessages() throws Exception;
+ /**
+ * Lists all the messages scheduled for delivery for this queue using JSON serialization.
+ */
@Operation(desc = "List the messages scheduled for delivery and returns them using JSON", impact = MBeanOperationInfo.INFO)
String listScheduledMessagesAsJSON() throws Exception;
+ /**
+ * Lists all the messages in this queue matching the specified filter.
+ * <br>
+ * 1 Map represents 1 message, keys are the message's properties and headers, values are the corresponding values.
+ */
@Operation(desc = "List all the messages in the queue matching the given filter", impact = MBeanOperationInfo.INFO)
Map<String, Object>[] listMessages(@Parameter(name = "filter", desc = "A message filter (can be empty)") String filter) throws Exception;
+ /**
+ * Lists all the messages in this queue matching the specified filter using JSON serialization.
+ */
@Operation(desc = "List all the messages in the queue matching the given filter and returns them using JSON", impact = MBeanOperationInfo.INFO)
String listMessagesAsJSON(@Parameter(name = "filter", desc = "A message filter (can be empty)") String filter) throws Exception;
+ /**
+ * Counts the number of messages in this queue matching the specified filter.
+ */
@Operation(desc = "Returns the number of the messages in the queue matching the given filter", impact = MBeanOperationInfo.INFO)
int countMessages(@Parameter(name = "filter", desc = "A message filter (can be empty)") String filter) throws Exception;
+ /**
+ * Removes the message corresponding to the specified message ID.
+ *
+ * @return {@code true}�if the message was removed, {@code false} else
+ */
@Operation(desc = "Remove the message corresponding to the given messageID", impact = MBeanOperationInfo.ACTION)
boolean removeMessage(@Parameter(name = "messageID", desc = "A message ID") long messageID) throws Exception;
+ /**
+ * Removes all the message corresponding to the specified filter.
+ * <br>
+ * Using {@code null} or an empty filter will remove <em>all</em> messages from this queue.
+ *
+ * @return the number of removed messages
+ */
@Operation(desc = "Remove the messages corresponding to the given filter (and returns the number of removed messages)", impact = MBeanOperationInfo.ACTION)
int removeMessages(@Parameter(name = "filter", desc = "A message filter (can be empty)") String filter) throws Exception;
- @Operation(desc = "Remove the messages corresponding to the given filter (and returns the number of expired messages)", impact = MBeanOperationInfo.ACTION)
+ /**
+ * Expires all the message corresponding to the specified filter.
+ * <br>
+ * Using {@code null} or an empty filter will expire <em>all</em> messages from this queue.
+ *
+ * @return the number of expired messages
+ */
+ @Operation(desc = "Expire the messages corresponding to the given filter (and returns the number of expired messages)", impact = MBeanOperationInfo.ACTION)
int expireMessages(@Parameter(name = "filter", desc = "A message filter") String filter) throws Exception;
+ /**
+ * Expires the message corresponding to the specified message ID.
+ *
+ * @return {@code true}�if the message was expired, {@code false} else
+ */
@Operation(desc = "Remove the message corresponding to the given messageID", impact = MBeanOperationInfo.ACTION)
boolean expireMessage(@Parameter(name = "messageID", desc = "A message ID") long messageID) throws Exception;
+ /**
+ * Moves the message corresponding to the specified message ID to the specified other queue.
+ *
+ * @return {@code true}�if the message was moved, {@code false} else
+ */
@Operation(desc = "Move the message corresponding to the given messageID to another queue", impact = MBeanOperationInfo.ACTION)
boolean moveMessage(@Parameter(name = "messageID", desc = "A message ID") long messageID,
@Parameter(name = "otherQueueName", desc = "The name of the queue to move the message to") String otherQueueName) throws Exception;
+ /**
+ * Moves all the message corresponding to the specified filter to the specified other queue.
+ * <br>
+ * Using {@code null} or an empty filter will move <em>all</em> messages from this queue.
+ *
+ * @return the number of moved messages
+ */
@Operation(desc = "Move the messages corresponding to the given filter (and returns the number of moved messages)", impact = MBeanOperationInfo.ACTION)
int moveMessages(@Parameter(name = "filter", desc = "A message filter (can be empty)") String filter,
@Parameter(name = "otherQueueName", desc = "The name of the queue to move the messages to") String otherQueueName) throws Exception;
+ /**
+ * Sends the message corresponding to the specified message ID to this queue's dead letter address.
+ *
+ * @return {@code true}�if the message was sent to the dead letter address, {@code false} else
+ */
@Operation(desc = "Send the message corresponding to the given messageID to this queue's Dead Letter Address", impact = MBeanOperationInfo.ACTION)
boolean sendMessageToDeadLetterAddress(@Parameter(name = "messageID", desc = "A message ID") long messageID) throws Exception;
+ /**
+ * Sends all the message corresponding to the specified filter to this queue's dead letter address.
+ * <br>
+ * Using {@code null} or an empty filter will send <em>all</em> messages from this queue.
+ *
+ * @return the number of sent messages
+ */
@Operation(desc = "Send the messages corresponding to the given filter to this queue's Dead Letter Address", impact = MBeanOperationInfo.ACTION)
int sendMessagesToDeadLetterAddress(@Parameter(name = "filter", desc = "A message filter (can be empty)") String filterStr) throws Exception;
+ /**
+ * Changes the message's priority corresponding to the specified message ID to the specified priority.
+ *
+ * @param priority between 0 and 9 inclusive.
+ *
+ * @return {@code true}�if the message priority was changed
+ */
@Operation(desc = "Change the priority of the message corresponding to the given messageID", impact = MBeanOperationInfo.ACTION)
boolean changeMessagePriority(@Parameter(name = "messageID", desc = "A message ID") long messageID,
@Parameter(name = "newPriority", desc = "the new priority (between 0 and 9)") int newPriority) throws Exception;
+ /**
+ * Changes the priority for all the message corresponding to the specified filter to the specified priority.
+ *
+ * @return the number of changed messages
+ */
@Operation(desc = "Change the priority of the messages corresponding to the given filter", impact = MBeanOperationInfo.ACTION)
int changeMessagesPriority(@Parameter(name = "filter", desc = "A message filter (can be empty)") String filter,
@Parameter(name = "newPriority", desc = "the new priority (between 0 and 9)") int newPriority) throws Exception;
+ /**
+ * Lists the message counter for this queue.
+ */
@Operation(desc = "List the message counters", impact = MBeanOperationInfo.INFO)
String listMessageCounter() throws Exception;
+ /**
+ * Resets the message counter for this queue.
+ */
@Operation(desc = "Reset the message counters", impact = MBeanOperationInfo.INFO)
void resetMessageCounter() throws Exception;
+ /**
+ * Lists the message counter for this queue as a HTML table.
+ */
@Operation(desc = "List the message counters as HTML", impact = MBeanOperationInfo.INFO)
String listMessageCounterAsHTML() throws Exception;
+ /**
+ * Lists the message counter history for this queue.
+ */
@Operation(desc = "List the message counters history", impact = MBeanOperationInfo.INFO)
String listMessageCounterHistory() throws Exception;
+ /**
+ * Lists the message counter history for this queue as a HTML table.
+ */
@Operation(desc = "List the message counters history HTML", impact = MBeanOperationInfo.INFO)
String listMessageCounterHistoryAsHTML() throws Exception;
+ /**
+ * Pauses the queue. Messages are no longer delivered to its consumers.
+ */
@Operation(desc = "Pauses the Queue", impact = MBeanOperationInfo.ACTION)
void pause() throws Exception;
+ /**
+ * Resumes the queue. Messages are again delivered to its consumers.
+ */
@Operation(desc = "Resumes delivery of queued messages and gets the queue out of paused state.", impact = MBeanOperationInfo.ACTION)
void resume() throws Exception;
+ /**
+ * Returns whether the queue is pause.
+ */
@Operation(desc = "Inspects if the queue is paused", impact = MBeanOperationInfo.INFO)
boolean isPaused() throws Exception;
}
15 years, 3 months
JBoss hornetq SVN: r8665 - in trunk/src/main/org/hornetq: jms/server/management/impl and 1 other directory.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-12-10 10:52:25 -0500 (Thu, 10 Dec 2009)
New Revision: 8665
Modified:
trunk/src/main/org/hornetq/core/management/impl/AbstractControl.java
trunk/src/main/org/hornetq/core/management/impl/AcceptorControlImpl.java
trunk/src/main/org/hornetq/core/management/impl/AddressControlImpl.java
trunk/src/main/org/hornetq/core/management/impl/BridgeControlImpl.java
trunk/src/main/org/hornetq/core/management/impl/BroadcastGroupControlImpl.java
trunk/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java
trunk/src/main/org/hornetq/core/management/impl/DiscoveryGroupControlImpl.java
trunk/src/main/org/hornetq/core/management/impl/DivertControlImpl.java
trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
trunk/src/main/org/hornetq/jms/server/management/impl/JMSConnectionFactoryControlImpl.java
trunk/src/main/org/hornetq/jms/server/management/impl/JMSQueueControlImpl.java
trunk/src/main/org/hornetq/jms/server/management/impl/JMSServerControlImpl.java
trunk/src/main/org/hornetq/jms/server/management/impl/JMSTopicControlImpl.java
Log:
readded JMX MBeanOperationInfo to provide meaningful parameters name to JMX clients
Modified: trunk/src/main/org/hornetq/core/management/impl/AbstractControl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/AbstractControl.java 2009-12-10 15:25:39 UTC (rev 8664)
+++ trunk/src/main/org/hornetq/core/management/impl/AbstractControl.java 2009-12-10 15:52:25 UTC (rev 8665)
@@ -13,6 +13,8 @@
package org.hornetq.core.management.impl;
+import javax.management.MBeanInfo;
+import javax.management.MBeanOperationInfo;
import javax.management.NotCompliantMBeanException;
import javax.management.StandardMBean;
@@ -77,6 +79,20 @@
}
+ abstract MBeanOperationInfo[] fillMBeanOperationInfo();
+
+ @Override
+ public MBeanInfo getMBeanInfo()
+ {
+ MBeanInfo info = super.getMBeanInfo();
+ return new MBeanInfo(info.getClassName(),
+ info.getDescription(),
+ info.getAttributes(),
+ info.getConstructors(),
+ fillMBeanOperationInfo(),
+ info.getNotifications());
+ }
+
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/management/impl/AcceptorControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/AcceptorControlImpl.java 2009-12-10 15:25:39 UTC (rev 8664)
+++ trunk/src/main/org/hornetq/core/management/impl/AcceptorControlImpl.java 2009-12-10 15:52:25 UTC (rev 8665)
@@ -15,8 +15,11 @@
import java.util.Map;
+import javax.management.MBeanOperationInfo;
+
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.management.AcceptorControl;
+import org.hornetq.core.management.AddressControl;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.remoting.spi.Acceptor;
@@ -130,6 +133,12 @@
blockOnIO();
}
}
+
+ @Override
+ MBeanOperationInfo[] fillMBeanOperationInfo()
+ {
+ return MBeanInfoHelper.getMBeanOperationsInfo(AcceptorControl.class);
+ }
// Public --------------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/management/impl/AddressControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/AddressControlImpl.java 2009-12-10 15:25:39 UTC (rev 8664)
+++ trunk/src/main/org/hornetq/core/management/impl/AddressControlImpl.java 2009-12-10 15:52:25 UTC (rev 8665)
@@ -16,6 +16,8 @@
import java.util.Iterator;
import java.util.Set;
+import javax.management.MBeanOperationInfo;
+
import org.hornetq.core.logging.Logger;
import org.hornetq.core.management.AddressControl;
import org.hornetq.core.paging.PagingManager;
@@ -241,6 +243,12 @@
blockOnIO();
}
}
+
+ @Override
+ MBeanOperationInfo[] fillMBeanOperationInfo()
+ {
+ return MBeanInfoHelper.getMBeanOperationsInfo(AddressControl.class);
+ }
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/hornetq/core/management/impl/BridgeControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/BridgeControlImpl.java 2009-12-10 15:25:39 UTC (rev 8664)
+++ trunk/src/main/org/hornetq/core/management/impl/BridgeControlImpl.java 2009-12-10 15:52:25 UTC (rev 8665)
@@ -13,7 +13,10 @@
package org.hornetq.core.management.impl;
+import javax.management.MBeanOperationInfo;
+
import org.hornetq.core.config.cluster.BridgeConfiguration;
+import org.hornetq.core.management.AddressControl;
import org.hornetq.core.management.BridgeControl;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.server.cluster.Bridge;
@@ -250,6 +253,12 @@
blockOnIO();
}
}
+
+ @Override
+ MBeanOperationInfo[] fillMBeanOperationInfo()
+ {
+ return MBeanInfoHelper.getMBeanOperationsInfo(BridgeControl.class);
+ }
// Public --------------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/management/impl/BroadcastGroupControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/BroadcastGroupControlImpl.java 2009-12-10 15:25:39 UTC (rev 8664)
+++ trunk/src/main/org/hornetq/core/management/impl/BroadcastGroupControlImpl.java 2009-12-10 15:52:25 UTC (rev 8665)
@@ -13,7 +13,10 @@
package org.hornetq.core.management.impl;
+import javax.management.MBeanOperationInfo;
+
import org.hornetq.core.config.cluster.BroadcastGroupConfiguration;
+import org.hornetq.core.management.AddressControl;
import org.hornetq.core.management.BroadcastGroupControl;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.server.cluster.BroadcastGroup;
@@ -208,6 +211,12 @@
}
}
+ @Override
+ MBeanOperationInfo[] fillMBeanOperationInfo()
+ {
+ return MBeanInfoHelper.getMBeanOperationsInfo(BroadcastGroupControl.class);
+ }
+
// Public --------------------------------------------------------
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java 2009-12-10 15:25:39 UTC (rev 8664)
+++ trunk/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java 2009-12-10 15:52:25 UTC (rev 8665)
@@ -16,7 +16,10 @@
import java.util.List;
import java.util.Map;
+import javax.management.MBeanOperationInfo;
+
import org.hornetq.core.config.cluster.ClusterConnectionConfiguration;
+import org.hornetq.core.management.AddressControl;
import org.hornetq.core.management.ClusterConnectionControl;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.server.cluster.ClusterConnection;
@@ -278,6 +281,12 @@
}
}
+ @Override
+ MBeanOperationInfo[] fillMBeanOperationInfo()
+ {
+ return MBeanInfoHelper.getMBeanOperationsInfo(ClusterConnectionControl.class);
+ }
+
// Public --------------------------------------------------------
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/hornetq/core/management/impl/DiscoveryGroupControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/DiscoveryGroupControlImpl.java 2009-12-10 15:25:39 UTC (rev 8664)
+++ trunk/src/main/org/hornetq/core/management/impl/DiscoveryGroupControlImpl.java 2009-12-10 15:52:25 UTC (rev 8665)
@@ -13,8 +13,11 @@
package org.hornetq.core.management.impl;
+import javax.management.MBeanOperationInfo;
+
import org.hornetq.core.cluster.DiscoveryGroup;
import org.hornetq.core.config.cluster.DiscoveryGroupConfiguration;
+import org.hornetq.core.management.AddressControl;
import org.hornetq.core.management.DiscoveryGroupControl;
import org.hornetq.core.persistence.StorageManager;
@@ -147,7 +150,14 @@
}
}
+
+ @Override
+ MBeanOperationInfo[] fillMBeanOperationInfo()
+ {
+ return MBeanInfoHelper.getMBeanOperationsInfo(DiscoveryGroupControl.class);
+ }
+
// Public --------------------------------------------------------
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/hornetq/core/management/impl/DivertControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/DivertControlImpl.java 2009-12-10 15:25:39 UTC (rev 8664)
+++ trunk/src/main/org/hornetq/core/management/impl/DivertControlImpl.java 2009-12-10 15:52:25 UTC (rev 8665)
@@ -13,6 +13,8 @@
package org.hornetq.core.management.impl;
+import javax.management.MBeanOperationInfo;
+
import org.hornetq.core.config.cluster.DivertConfiguration;
import org.hornetq.core.management.DivertControl;
import org.hornetq.core.persistence.StorageManager;
@@ -141,7 +143,14 @@
blockOnIO();
}
}
+
+ @Override
+ MBeanOperationInfo[] fillMBeanOperationInfo()
+ {
+ return MBeanInfoHelper.getMBeanOperationsInfo(DivertControl.class);
+ }
+
// Public --------------------------------------------------------
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2009-12-10 15:25:39 UTC (rev 8664)
+++ trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2009-12-10 15:52:25 UTC (rev 8665)
@@ -26,6 +26,7 @@
import javax.management.ListenerNotFoundException;
import javax.management.MBeanNotificationInfo;
+import javax.management.MBeanOperationInfo;
import javax.management.NotificationBroadcasterSupport;
import javax.management.NotificationEmitter;
import javax.management.NotificationFilter;
@@ -1208,4 +1209,10 @@
return configuration.isWildcardRoutingEnabled();
}
+ @Override
+ MBeanOperationInfo[] fillMBeanOperationInfo()
+ {
+ return MBeanInfoHelper.getMBeanOperationsInfo(HornetQServerControl.class);
+ }
+
}
Modified: trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2009-12-10 15:25:39 UTC (rev 8664)
+++ trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2009-12-10 15:52:25 UTC (rev 8665)
@@ -16,6 +16,8 @@
import java.util.List;
import java.util.Map;
+import javax.management.MBeanOperationInfo;
+
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.filter.impl.FilterImpl;
@@ -700,6 +702,12 @@
}
}
+ @Override
+ MBeanOperationInfo[] fillMBeanOperationInfo()
+ {
+ return MBeanInfoHelper.getMBeanOperationsInfo(QueueControl.class);
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/src/main/org/hornetq/jms/server/management/impl/JMSConnectionFactoryControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/management/impl/JMSConnectionFactoryControlImpl.java 2009-12-10 15:25:39 UTC (rev 8664)
+++ trunk/src/main/org/hornetq/jms/server/management/impl/JMSConnectionFactoryControlImpl.java 2009-12-10 15:52:25 UTC (rev 8665)
@@ -15,11 +15,14 @@
import java.util.List;
+import javax.management.MBeanInfo;
import javax.management.NotCompliantMBeanException;
import javax.management.StandardMBean;
+import org.hornetq.core.management.impl.MBeanInfoHelper;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.server.management.ConnectionFactoryControl;
+import org.hornetq.jms.server.management.JMSQueueControl;
/**
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
@@ -167,6 +170,18 @@
return cf.isAutoGroup();
}
+ @Override
+ public MBeanInfo getMBeanInfo()
+ {
+ MBeanInfo info = super.getMBeanInfo();
+ return new MBeanInfo(info.getClassName(),
+ info.getDescription(),
+ info.getAttributes(),
+ info.getConstructors(),
+ MBeanInfoHelper.getMBeanOperationsInfo(ConnectionFactoryControl.class),
+ info.getNotifications());
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/src/main/org/hornetq/jms/server/management/impl/JMSQueueControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/management/impl/JMSQueueControlImpl.java 2009-12-10 15:25:39 UTC (rev 8664)
+++ trunk/src/main/org/hornetq/jms/server/management/impl/JMSQueueControlImpl.java 2009-12-10 15:52:25 UTC (rev 8665)
@@ -15,12 +15,14 @@
import java.util.Map;
+import javax.management.MBeanInfo;
import javax.management.StandardMBean;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.management.MessageCounterInfo;
import org.hornetq.core.management.QueueControl;
+import org.hornetq.core.management.impl.MBeanInfoHelper;
import org.hornetq.core.messagecounter.MessageCounter;
import org.hornetq.core.messagecounter.impl.MessageCounterHelper;
import org.hornetq.jms.HornetQQueue;
@@ -336,6 +338,18 @@
coreQueueControl.resume();
}
+ @Override
+ public MBeanInfo getMBeanInfo()
+ {
+ MBeanInfo info = super.getMBeanInfo();
+ return new MBeanInfo(info.getClassName(),
+ info.getDescription(),
+ info.getAttributes(),
+ info.getConstructors(),
+ MBeanInfoHelper.getMBeanOperationsInfo(JMSQueueControl.class),
+ info.getNotifications());
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/src/main/org/hornetq/jms/server/management/impl/JMSServerControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/management/impl/JMSServerControlImpl.java 2009-12-10 15:25:39 UTC (rev 8664)
+++ trunk/src/main/org/hornetq/jms/server/management/impl/JMSServerControlImpl.java 2009-12-10 15:52:25 UTC (rev 8665)
@@ -19,6 +19,7 @@
import java.util.concurrent.atomic.AtomicLong;
import javax.management.ListenerNotFoundException;
+import javax.management.MBeanInfo;
import javax.management.MBeanNotificationInfo;
import javax.management.Notification;
import javax.management.NotificationBroadcasterSupport;
@@ -29,6 +30,7 @@
import org.hornetq.core.client.management.impl.ManagementHelper;
import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.core.management.impl.MBeanInfoHelper;
import org.hornetq.jms.server.JMSServerManager;
import org.hornetq.jms.server.management.ConnectionFactoryControl;
import org.hornetq.jms.server.management.JMSQueueControl;
@@ -779,6 +781,18 @@
return server.listSessions(connectionID);
}
+ @Override
+ public MBeanInfo getMBeanInfo()
+ {
+ MBeanInfo info = super.getMBeanInfo();
+ return new MBeanInfo(info.getClassName(),
+ info.getDescription(),
+ info.getAttributes(),
+ info.getConstructors(),
+ MBeanInfoHelper.getMBeanOperationsInfo(JMSServerControl.class),
+ info.getNotifications());
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/src/main/org/hornetq/jms/server/management/impl/JMSTopicControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/management/impl/JMSTopicControlImpl.java 2009-12-10 15:25:39 UTC (rev 8664)
+++ trunk/src/main/org/hornetq/jms/server/management/impl/JMSTopicControlImpl.java 2009-12-10 15:52:25 UTC (rev 8665)
@@ -18,6 +18,7 @@
import java.util.List;
import java.util.Map;
+import javax.management.MBeanInfo;
import javax.management.StandardMBean;
import org.hornetq.core.exception.HornetQException;
@@ -26,10 +27,12 @@
import org.hornetq.core.management.HornetQServerControl;
import org.hornetq.core.management.QueueControl;
import org.hornetq.core.management.ResourceNames;
+import org.hornetq.core.management.impl.MBeanInfoHelper;
import org.hornetq.core.server.management.ManagementService;
import org.hornetq.jms.HornetQTopic;
import org.hornetq.jms.client.HornetQMessage;
import org.hornetq.jms.client.SelectorTranslator;
+import org.hornetq.jms.server.management.JMSQueueControl;
import org.hornetq.jms.server.management.TopicControl;
import org.hornetq.utils.Pair;
import org.hornetq.utils.json.JSONArray;
@@ -347,6 +350,18 @@
}
}
+ @Override
+ public MBeanInfo getMBeanInfo()
+ {
+ MBeanInfo info = super.getMBeanInfo();
+ return new MBeanInfo(info.getClassName(),
+ info.getDescription(),
+ info.getAttributes(),
+ info.getConstructors(),
+ MBeanInfoHelper.getMBeanOperationsInfo(TopicControl.class),
+ info.getNotifications());
+ }
+
// Inner classes -------------------------------------------------
private enum DurabilityType
15 years, 3 months
JBoss hornetq SVN: r8664 - in trunk: examples/common/config and 32 other directories.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-12-10 10:25:39 -0500 (Thu, 10 Dec 2009)
New Revision: 8664
Added:
trunk/src/main/org/hornetq/core/server/management/
trunk/src/main/org/hornetq/core/server/management/ManagementService.java
trunk/src/main/org/hornetq/core/server/management/Notification.java
trunk/src/main/org/hornetq/core/server/management/NotificationListener.java
trunk/src/main/org/hornetq/core/server/management/NotificationService.java
trunk/src/main/org/hornetq/core/server/management/Operation.java
trunk/src/main/org/hornetq/core/server/management/Parameter.java
trunk/src/main/org/hornetq/core/server/management/impl/
trunk/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java
Removed:
trunk/src/main/org/hornetq/core/management/ManagementService.java
trunk/src/main/org/hornetq/core/management/Notification.java
trunk/src/main/org/hornetq/core/management/NotificationListener.java
trunk/src/main/org/hornetq/core/management/NotificationService.java
trunk/src/main/org/hornetq/core/management/Operation.java
trunk/src/main/org/hornetq/core/management/Parameter.java
trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java
Modified:
trunk/docs/user-manual/en/configuration-index.xml
trunk/docs/user-manual/en/management.xml
trunk/examples/common/config/hornetq-example-beans.xml
trunk/src/config/common/schema/hornetq-configuration.xsd
trunk/src/main/org/hornetq/core/client/ClientMessage.java
trunk/src/main/org/hornetq/core/cluster/DiscoveryGroup.java
trunk/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java
trunk/src/main/org/hornetq/core/config/Configuration.java
trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java
trunk/src/main/org/hornetq/core/management/AcceptorControl.java
trunk/src/main/org/hornetq/core/management/AddressControl.java
trunk/src/main/org/hornetq/core/management/BridgeControl.java
trunk/src/main/org/hornetq/core/management/BroadcastGroupControl.java
trunk/src/main/org/hornetq/core/management/ClusterConnectionControl.java
trunk/src/main/org/hornetq/core/management/DayCounterInfo.java
trunk/src/main/org/hornetq/core/management/DiscoveryGroupControl.java
trunk/src/main/org/hornetq/core/management/DivertControl.java
trunk/src/main/org/hornetq/core/management/HornetQComponentControl.java
trunk/src/main/org/hornetq/core/management/HornetQServerControl.java
trunk/src/main/org/hornetq/core/management/ObjectNameBuilder.java
trunk/src/main/org/hornetq/core/management/QueueControl.java
trunk/src/main/org/hornetq/core/management/ResourceNames.java
trunk/src/main/org/hornetq/core/management/RoleInfo.java
trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
trunk/src/main/org/hornetq/core/management/impl/MBeanInfoHelper.java
trunk/src/main/org/hornetq/core/message/Message.java
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
trunk/src/main/org/hornetq/core/remoting/spi/Acceptor.java
trunk/src/main/org/hornetq/core/security/impl/SecurityStoreImpl.java
trunk/src/main/org/hornetq/core/server/HornetQServer.java
trunk/src/main/org/hornetq/core/server/cluster/Bridge.java
trunk/src/main/org/hornetq/core/server/cluster/BroadcastGroup.java
trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
trunk/src/main/org/hornetq/core/server/cluster/impl/BroadcastGroupImpl.java
trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
trunk/src/main/org/hornetq/core/server/group/GroupingHandler.java
trunk/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java
trunk/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
trunk/src/main/org/hornetq/jms/server/management/DestinationControl.java
trunk/src/main/org/hornetq/jms/server/management/JMSQueueControl.java
trunk/src/main/org/hornetq/jms/server/management/JMSServerControl.java
trunk/src/main/org/hornetq/jms/server/management/TopicControl.java
trunk/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java
trunk/src/main/org/hornetq/jms/server/management/impl/JMSTopicControlImpl.java
trunk/tests/config/ConfigurationTest-full-config.xml
trunk/tests/src/org/hornetq/tests/integration/SimpleNotificationService.java
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
trunk/tests/src/org/hornetq/tests/integration/discovery/DiscoveryTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/bridge/BridgeTestBase.java
trunk/tests/src/org/hornetq/tests/integration/management/AcceptorControlTest.java
trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlTest.java
trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java
trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java
trunk/tests/src/org/hornetq/tests/integration/management/ManagementServiceImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/config/impl/ConfigurationImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/config/impl/DefaultsFileConfigurationTest.java
trunk/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java
Log:
HORNETQ-185 + HORNETQ-186: API review + javadoc
* move ManagementService + related interfaces to org.hornetq.core.server.management
(as they are not part of the client management API)
* removed unused management-request-timeout from configuration
* added javadoc for org.hornetq.core.management API
Modified: trunk/docs/user-manual/en/configuration-index.xml
===================================================================
--- trunk/docs/user-manual/en/configuration-index.xml 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/docs/user-manual/en/configuration-index.xml 2009-12-10 15:25:39 UTC (rev 8664)
@@ -250,14 +250,6 @@
<entry>hornetq.notifications</entry>
</row>
<row>
- <entry><link linkend="management.replication"
- >management-request-timeout</link></entry>
- <entry>Long</entry>
- <entry>how long (in ms) to wait for a reply to a management
- request</entry>
- <entry>5000</entry>
- </row>
- <row>
<entry><link linkend="configuring.message.counters"
>message-counter-enabled</link></entry>
<entry>Boolean</entry>
Modified: trunk/docs/user-manual/en/management.xml
===================================================================
--- trunk/docs/user-manual/en/management.xml 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/docs/user-manual/en/management.xml 2009-12-10 15:25:39 UTC (rev 8664)
@@ -737,15 +737,6 @@
<para>It is strongly suggested to change these values from their default. If they are not
changed from the default, HornetQ will detect this and pester you with a warning on every
start-up.</para>
- <para>HornetQ internally uses Core messages to replicate management operations between the
- live and backup server when JMX is used. By default, there is a timeout of 5s (5000ms) to
- send a management request from the live server to the backup server and wait for a reply.
- If a reply is not received before the timeout is hit, HornetQ considers the replication has
- failed. This timeout can be configured in <literal
- >hornetq-configuration.xml</literal>:</para>
- <programlisting>
- <management-request-timeout>5000</management-request-timeout>
- </programlisting>
</section>
<section id="management.notifications">
<title>Management Notifications</title>
Modified: trunk/examples/common/config/hornetq-example-beans.xml
===================================================================
--- trunk/examples/common/config/hornetq-example-beans.xml 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/examples/common/config/hornetq-example-beans.xml 2009-12-10 15:25:39 UTC (rev 8664)
@@ -138,7 +138,7 @@
</constructor>
</bean>
- <bean name="ManagementService" class="org.hornetq.core.management.impl.ManagementServiceImpl">
+ <bean name="ManagementService" class="org.hornetq.core.server.management.impl.ManagementServiceImpl">
<constructor>
<parameter>
<inject bean="MBeanServer"/>
Modified: trunk/src/config/common/schema/hornetq-configuration.xsd
===================================================================
--- trunk/src/config/common/schema/hornetq-configuration.xsd 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/config/common/schema/hornetq-configuration.xsd 2009-12-10 15:25:39 UTC (rev 8664)
@@ -37,8 +37,6 @@
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="log-delegate-factory-class-name" type="xsd:string">
</xsd:element>
- <xsd:element maxOccurs="1" minOccurs="0" name="management-request-timeout" type="xsd:long">
- </xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="jmx-management-enabled" type="xsd:boolean">
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="jmx-domain" type="xsd:string">
Modified: trunk/src/main/org/hornetq/core/client/ClientMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/ClientMessage.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/core/client/ClientMessage.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -63,7 +63,7 @@
/**
* Sets the OutputStream that will receive the content of a message received in a non blocking way.
- *
+ * <br>
* This method is used when consuming large messages
*
* @throws HornetQException
@@ -73,8 +73,7 @@
/**
* Saves the content of the message to the OutputStream.
* It will block until the entire content is transfered to the OutputStream.
- *
- * This method is used for when consuming large messages
+ * <br>
*
* @throws HornetQException
*/
@@ -88,13 +87,12 @@
* @param timeMilliseconds - 0 means wait forever
* @return true if it reached the end
* @throws HornetQException
-
*/
boolean waitOutputStreamCompletion(long timeMilliseconds) throws HornetQException;
/**
- * Sets the body's IntputStream.
- *
+ * Sets the body's IntputStream.
+ * <br>
* This method is used when sending large messages
*
* @throws HornetQException
Modified: trunk/src/main/org/hornetq/core/cluster/DiscoveryGroup.java
===================================================================
--- trunk/src/main/org/hornetq/core/cluster/DiscoveryGroup.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/core/cluster/DiscoveryGroup.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -15,7 +15,7 @@
import java.util.Map;
-import org.hornetq.core.management.NotificationService;
+import org.hornetq.core.server.management.NotificationService;
/**
* A DiscoveryGroup
Modified: trunk/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/core/cluster/impl/DiscoveryGroupImpl.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -30,9 +30,9 @@
import org.hornetq.core.cluster.DiscoveryListener;
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.management.Notification;
-import org.hornetq.core.management.NotificationService;
import org.hornetq.core.management.NotificationType;
+import org.hornetq.core.server.management.Notification;
+import org.hornetq.core.server.management.NotificationService;
import org.hornetq.utils.Pair;
import org.hornetq.utils.SimpleString;
import org.hornetq.utils.TypedProperties;
Modified: trunk/src/main/org/hornetq/core/config/Configuration.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/Configuration.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/core/config/Configuration.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -165,10 +165,6 @@
void setManagementClusterPassword(String password);
- long getManagementRequestTimeout();
-
- void setManagementRequestTimeout(long timeout);
-
int getIDCacheSize();
void setIDCacheSize(int idCacheSize);
Modified: trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -140,8 +140,6 @@
public static final String DEFAULT_MANAGEMENT_CLUSTER_PASSWORD = "CHANGE ME!!";
- public static final long DEFAULT_MANAGEMENT_REQUEST_TIMEOUT = 5000;
-
public static final long DEFAULT_BROADCAST_PERIOD = 1000;
public static final long DEFAULT_BROADCAST_REFRESH_TIMEOUT = 10000;
@@ -308,8 +306,6 @@
protected String managementClusterPassword = ConfigurationImpl.DEFAULT_MANAGEMENT_CLUSTER_PASSWORD;
- protected long managementRequestTimeout = ConfigurationImpl.DEFAULT_MANAGEMENT_REQUEST_TIMEOUT;
-
protected long serverDumpInterval = ConfigurationImpl.DEFAULT_SERVER_DUMP_INTERVAL;
// percentage of free memory which triggers warning from the memory manager
@@ -872,16 +868,6 @@
managementClusterPassword = clusterPassword;
}
- public long getManagementRequestTimeout()
- {
- return managementRequestTimeout;
- }
-
- public void setManagementRequestTimeout(final long managementRequestTimeout)
- {
- this.managementRequestTimeout = managementRequestTimeout;
- }
-
public int getJournalCompactMinFiles()
{
return journalCompactMinFiles;
@@ -1220,10 +1206,6 @@
{
return false;
}
- if (managementRequestTimeout != other.managementRequestTimeout)
- {
- return false;
- }
if (messageCounterEnabled != other.messageCounterEnabled)
{
return false;
Modified: trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -180,11 +180,6 @@
managementClusterUser,
Validators.NOT_NULL_OR_EMPTY);
- managementRequestTimeout = XMLConfigurationUtil.getLong(e,
- "management-request-timeout",
- managementRequestTimeout,
- Validators.GT_ZERO);
-
logDelegateFactoryClassName = XMLConfigurationUtil.getString(e,
"log-delegate-factory-class-name",
logDelegateFactoryClassName,
Modified: trunk/src/main/org/hornetq/core/management/AcceptorControl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/AcceptorControl.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/core/management/AcceptorControl.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -15,17 +15,33 @@
import java.util.Map;
+import org.hornetq.core.remoting.spi.Acceptor;
+import org.hornetq.core.remoting.spi.AcceptorFactory;
+
/**
- * A AcceptorMBean
+ * An AcceptorControl is used to manage Acceptors.
*
* @author <a href="jmesnil(a)redhat.com">Jeff Mesnil</a>
- *
+ *
+ * @see Acceptor
*/
public interface AcceptorControl extends HornetQComponentControl
{
+ /**
+ * Returns the name of the acceptor
+ */
String getName();
+ /**
+ * Returns the class name of the AcceptorFactory implementation
+ * used by this acceptor.
+ *
+ * @see AcceptorFactory
+ */
String getFactoryClassName();
+ /**
+ * Returns the parameters used to configure this acceptor
+ */
Map<String, Object> getParameters();
}
Modified: trunk/src/main/org/hornetq/core/management/AddressControl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/AddressControl.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/core/management/AddressControl.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -13,33 +13,67 @@
package org.hornetq.core.management;
+import org.hornetq.core.server.management.Operation;
+import org.hornetq.core.server.management.Parameter;
+
/**
+ * An AddressControl is used to manage an address.
+ *
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
- *
- * @version <tt>$Revision$</tt>
- *
*/
public interface AddressControl
{
// Attributes ----------------------------------------------------
+ /**
+ * Returns the managed address.
+ */
String getAddress();
+ /**
+ * Returns the roles (name and permissions) associated to this address.
+ */
Object[] getRoles() throws Exception;
+ /**
+ * Returns the roles (name and permissions) associated to this address
+ * using JSON serialization.
+ */
String getRolesAsJSON() throws Exception;
+ /**
+ * Returns the names of the queues bound to this address.
+ */
String[] getQueueNames() throws Exception;
+ /**
+ * Returns the number of pages used by this address.
+ */
int getNumberOfPages() throws Exception;
+ /**
+ * Returns the number of bytes used by each page for this address.
+ */
long getNumberOfBytesPerPage() throws Exception;
// Operations ----------------------------------------------------
+ /**
+ * Adds a role to this address.
+ *
+ * @param name name of the role
+ * @param send can the user send to this address?
+ * @param consume can the user consume from a queue bound to this address?
+ * @param createDurableQueue can the user create a durable queue bound to this address?
+ * @param deleteDurableQueue can the user delete a durable queue bound to this address?
+ * @param createNonDurableQueue can the user create a non-durable queue bound to this address?
+ * @param deleteNonDurableQueue can the user delete a non-durable queue bound to this address?
+ * @param manage can the user send management messages to this address?
+ * @throws Exception if an exception occurred while adding the role
+ */
@Operation(desc = "Add a Role to this address")
void addRole(@Parameter(name = "name", desc = "Name of the role to add") String name,
- @Parameter(name = "send", desc = "Can the user send to an address?") boolean send,
+ @Parameter(name = "send", desc = "Can the user send to this address?") boolean send,
@Parameter(name = "consume", desc = "Can the user consume from this address?") boolean consume,
@Parameter(name = "createDurableQueue", desc = "Can the user create a durable queue?") boolean createDurableQueue,
@Parameter(name = "deleteDurableQueue", desc = "Can the user delete a durable queue?") boolean deleteDurableQueue,
@@ -47,6 +81,11 @@
@Parameter(name = "deleteNonDurableQueue", desc = "Can the user delete a temp queue?") boolean deleteNonDurableQueue,
@Parameter(name = "manage", desc = "Can the user send management messages?") boolean manage) throws Exception;
+ /**
+ * Removes the role corresponding to the specified name from this address.
+ *
+ * @throws Exception if an exception occurred while removing the role
+ */
@Operation(desc = "Remove a Role from this address")
void removeRole(@Parameter(name = "name", desc = "Name of the role to remove") String name) throws Exception;
}
Modified: trunk/src/main/org/hornetq/core/management/BridgeControl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/BridgeControl.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/core/management/BridgeControl.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -13,35 +13,75 @@
package org.hornetq.core.management;
+
/**
- * A BridgeControlMBean
+ * A BridgeControl is used to manage a Bridge.
*
* @author <a href="jmesnil(a)redhat.com">Jeff Mesnil</a>
*
*/
public interface BridgeControl extends HornetQComponentControl
{
+ /**
+ * Returns the name of this bridge
+ */
String getName();
+ /**
+ * Returns the name of the queue this bridge is consuming messages from.
+ */
String getQueueName();
+ /**
+ * Returns the address this bridge will forward messages to.
+ */
String getForwardingAddress();
+ /**
+ * Returns the filter string associated to this bridge.
+ */
String getFilterString();
+ /**
+ * Return the name of the Transformer implementation associated to this bridge.
+ *
+ * @see org.hornetq.core.server.cluster.Transformer
+ */
String getTransformerClassName();
+ /**
+ * Returns the pair of connectors used by this bridge.
+ */
String[] getConnectorPair() throws Exception;
+ /**
+ * Returns the name of the discovery group used by this bridge.
+ */
String getDiscoveryGroupName();
+ /**
+ * Returns the retry interval used by this bridge.
+ */
long getRetryInterval();
+ /**
+ * Returns the retry interval multiplier used by this bridge.
+ */
double getRetryIntervalMultiplier();
+ /**
+ * Returns the number of reconnection attempts used by this bridge.
+ */
int getReconnectAttempts();
+ /**
+ * Returns whether the session used by this bridge will failover if
+ * the target server is normally shutdown.
+ */
boolean isFailoverOnServerShutdown();
+ /**
+ * Returns whether this bridge is using duplicate detection.
+ */
boolean isUseDuplicateDetection();
}
Modified: trunk/src/main/org/hornetq/core/management/BroadcastGroupControl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/BroadcastGroupControl.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/core/management/BroadcastGroupControl.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -14,24 +14,46 @@
package org.hornetq.core.management;
/**
- * A BroadcastGroupControlMBean
+ * A BroadcastGroupControl is used to manage a broadcast group.
*
* @author <a href="jmesnil(a)redhat.com">Jeff Mesnil</a>
*
*/
public interface BroadcastGroupControl extends HornetQComponentControl
{
+ /**
+ * Returns the configuration name of this broadcast group.
+ */
String getName();
+ /**
+ * Returns the local port this broadcast group is bound to.
+ */
int getLocalBindPort();
+ /**
+ * Returns the address this broadcast group is broadcasting to.
+ */
String getGroupAddress();
+ /**
+ * Returns the port this broadcast group is broadcasting to.
+ */
int getGroupPort();
+ /**
+ * Returns the period used by this broadcast group.
+ */
long getBroadcastPeriod();
+ /**
+ * Returns the pairs of live-backup connectors that are broadcasted by this broadcast group.
+ */
Object[] getConnectorPairs();
+ /**
+ * Returns the pairs of live-backup connectors that are broadcasted by this broadcast group
+ * using JSON serialization.
+ */
String getConnectorPairsAsJSON() throws Exception;
}
Modified: trunk/src/main/org/hornetq/core/management/ClusterConnectionControl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/ClusterConnectionControl.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/core/management/ClusterConnectionControl.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -16,32 +16,68 @@
import java.util.Map;
/**
- * A ClusterConnectionControlMBean
+ * A ClusterConnectionControl is used to manage a cluster connection.
*
* @author <a href="jmesnil(a)redhat.com">Jeff Mesnil</a>
*
*/
public interface ClusterConnectionControl extends HornetQComponentControl
{
+ /**
+ * Returns the configuration name of this cluster connection.
+ */
String getName();
+ /**
+ * Returns the address used by this cluster connection.
+ */
String getAddress();
+ /**
+ * Returns the node ID used by this cluster connection.
+ */
String getNodeID();
+ /**
+ * Return whether this cluster connection use duplicate detection.
+ */
boolean isDuplicateDetection();
+ /**
+ * Return whether this cluster connection forward messages when it has no local consumers.
+ */
boolean isForwardWhenNoConsumers();
+ /**
+ * Returns the maximum number of hops used by this cluster connection.
+ */
int getMaxHops();
+ /**
+ * Returns the pairs of live-backup connectors used by this cluster connection.
+ */
Object[] getStaticConnectorNamePairs();
+ /**
+ * Returns the pairs of live-backup connectors used by this cluster connection
+ * using JSON serialization.
+ */
String getStaticConnectorNamePairsAsJSON() throws Exception;
+ /**
+ * Returns the name of the discovery group used by this cluster connection.
+ */
String getDiscoveryGroupName();
+ /**
+ * Returns the connection retry interval used by this cluster connection.
+ */
long getRetryInterval();
+ /**
+ * Returns a map of the nodes connected to this cluster connection.
+ * <br>
+ * keys are node IDs, values are the addresses used to connect to the nodes.
+ */
Map<String, String> getNodes() throws Exception;
}
Modified: trunk/src/main/org/hornetq/core/management/DayCounterInfo.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/DayCounterInfo.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/core/management/DayCounterInfo.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -22,9 +22,6 @@
/**
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
- *
- * @version <tt>$Revision$</tt>
- *
*/
public class DayCounterInfo
{
@@ -87,6 +84,8 @@
// Public --------------------------------------------------------
+ /**
+ */
public String getDate()
{
return date;
Modified: trunk/src/main/org/hornetq/core/management/DiscoveryGroupControl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/DiscoveryGroupControl.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/core/management/DiscoveryGroupControl.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -14,18 +14,30 @@
package org.hornetq.core.management;
/**
- * A DiscoveryGroupControlMBean
+ * A DiscoveryGroupControl is used to manage a discovery group.
*
* @author <a href="jmesnil(a)redhat.com">Jeff Mesnil</a>
*
*/
public interface DiscoveryGroupControl extends HornetQComponentControl
{
+ /**
+ * Returns the configuration name of this discovery group.
+ */
String getName();
+ /**
+ * Returns the address that this discovery group is listening to.
+ */
String getGroupAddress();
+ /**
+ * Returns the port that this discovery group is listening to.
+ */
int getGroupPort();
+ /**
+ * Returns the refresh timeout used by this discovery group.
+ */
long getRefreshTimeout();
}
Modified: trunk/src/main/org/hornetq/core/management/DivertControl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/DivertControl.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/core/management/DivertControl.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -14,24 +14,49 @@
package org.hornetq.core.management;
/**
+ * A DivertControl is used to manage a divert.
+ *
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
- *
- * @version <tt>$Revision$</tt>
- *
*/
public interface DivertControl
{
+ /**
+ * Returns the filter used by this divert.
+ */
String getFilter();
+ /**
+ * Returns whether this divert is exclusive.
+ * <br>
+ * if {@code true} messages will be exclusively diverted and will not be routed to the origin address,
+ * else messages will be routed both to the origin address and the forwarding address.
+ */
boolean isExclusive();
+ /**
+ * Returns the cluster-wide unique name of this divert.
+ */
String getUniqueName();
+ /**
+ * Returns the routing name of this divert.
+ */
String getRoutingName();
+ /**
+ * Returns the origin address used by this divert.
+ */
String getAddress();
+ /**
+ * Returns the forwarding address used by this divert.
+ */
String getForwardingAddress();
+ /**
+ * Return the name of the Transformer implementation associated to this bridge.
+ *
+ * @see org.hornetq.core.server.cluster.Transformer
+ */
String getTransformerClassName();
}
Modified: trunk/src/main/org/hornetq/core/management/HornetQComponentControl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/HornetQComponentControl.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/core/management/HornetQComponentControl.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -14,15 +14,24 @@
package org.hornetq.core.management;
/**
- * A HornetQComponentControl
+ * A HornetQComponentControl is used to manage the lifecycle of a HornetQ components.
*
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
*/
public interface HornetQComponentControl
{
+ /**
+ * Returns {@code true} if this component is started, {@code false} else.
+ */
boolean isStarted();
+ /**
+ * Starts this component.
+ */
void start() throws Exception;
+ /**
+ * Stops this component.
+ */
void stop() throws Exception;
}
Modified: trunk/src/main/org/hornetq/core/management/HornetQServerControl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/HornetQServerControl.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/core/management/HornetQServerControl.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -15,188 +15,462 @@
import javax.management.MBeanOperationInfo;
-import org.hornetq.core.config.Configuration;
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.remoting.Interceptor;
+import org.hornetq.core.server.management.Operation;
+import org.hornetq.core.server.management.Parameter;
/**
- * This interface describes the core management interface exposed by the server
+ * A HornetQServerControl is used to manage HornetQ servers.
*/
public interface HornetQServerControl
{
// Attributes ----------------------------------------------------
+ /**
+ * Returns the name of the connector used to connect to the backup.
+ * <br>
+ * If this server has no backup or is itself a backup, the value is {@code null}.
+ */
String getBackupConnectorName();
+ /**
+ * Returns this server's version.
+ */
String getVersion();
+ /**
+ * Returns the number of connections connected to this server.
+ */
int getConnectionCount();
+ /**
+ * Return whether this server is started.
+ */
boolean isStarted();
+ /**
+ * Returns the list of interceptors used by this server.
+ *
+ * @see Interceptor
+ */
String[] getInterceptorClassNames();
+ /**
+ * Returns whether this server is clustered.
+ */
boolean isClustered();
+ /**
+ * Returns the maximum number of threads in the <em>scheduled</em> thread pool.
+ */
int getScheduledThreadPoolMaxSize();
+ /**
+ * Returns the maximum number of threads in the thread pool.
+ */
int getThreadPoolMaxSize();
+ /**
+ * Returns the interval time (in milliseconds) to invalidate security credentials.
+ */
long getSecurityInvalidationInterval();
+ /**
+ * Returns whether security is enabled for this server.
+ */
boolean isSecurityEnabled();
+ /**
+ * Returns the file system directory used to store bindings.
+ */
String getBindingsDirectory();
+ /**
+ * Returns the file system directory used to store journal log.
+ */
String getJournalDirectory();
+ /**
+ * Returns the type of journal used by this server (either {@code NIO} or {@code ASYNCIO}).
+ */
String getJournalType();
+ /**
+ * Returns whether the journal is synchronized when receiving transactional data.
+ */
boolean isJournalSyncTransactional();
+ /**
+ * Returns whether the journal is synchronized when receiving non-transactional data.
+ */
boolean isJournalSyncNonTransactional();
+ /**
+ * Returns the size (in bytes) of each journal files.
+ */
int getJournalFileSize();
+ /**
+ * Returns the number of journal files to pre-create.
+ */
int getJournalMinFiles();
+ /**
+ * Returns the maximum number of write requests that can be in the AIO queue at any given time.
+ */
int getJournalMaxIO();
+ /**
+ * Returns the size of the internal buffer on the journal.
+ */
int getJournalBufferSize();
+ /**
+ * Returns the timeout (in nanoseconds) used to flush internal buffers on the journal.
+ */
int getJournalBufferTimeout();
+ /**
+ * Returns the minimal number of journal files before compacting.
+ */
int getJournalCompactMinFiles();
+ /**
+ * Return the percentage of live data before compacting the journal.
+ */
int getJournalCompactPercentage();
+ /**
+ * Returns whether this server is using persistence and store data.
+ */
boolean isPersistenceEnabled();
+ /**
+ * Returns whether the bindings directory is created on this server startup.
+ */
boolean isCreateBindingsDir();
+ /**
+ * Returns whether the journal directory is created on this server startup.
+ */
boolean isCreateJournalDir();
- Configuration getConfiguration();
-
+ /**
+ * Returns whether message counter is enabled for this server.
+ */
boolean isMessageCounterEnabled();
+ /**
+ * Returns the maximum number of days kept in memory for message counter.
+ */
int getMessageCounterMaxDayCount();
+ /**
+ * Sets the maximum number of days kept in memory for message counter.
+ *
+ * @count value must be greater than 0
+ */
void setMessageCounterMaxDayCount(int count) throws Exception;
+ /**
+ * Returns the sample period (in milliseconds) to take message counter snapshot.
+ */
long getMessageCounterSamplePeriod();
+ /**
+ * Sets the sample period to take message counter snapshot.
+ *
+ * @param newPeriod value must be greater than 1000ms
+ */
void setMessageCounterSamplePeriod(long newPeriod) throws Exception;
+ /**
+ * Returns {@code true} if this server is a backup, {@code false} if it is a live server.
+ * <br>
+ * If a backup server has been activated, returns {@code false}.
+ */
boolean isBackup();
+ /**
+ * Returns whether this server shares its data store with a corresponding live or backup server.
+ */
boolean isSharedStore();
+ /**
+ * Returns the file system directory used to store paging files.
+ */
String getPagingDirectory();
+ /**
+ * Returns whether delivery count is persisted before messages are delivered to the consumers.
+ */
boolean isPersistDeliveryCountBeforeDelivery();
+ /**
+ * Returns the connection time to live.
+ * <br>
+ * This value overrides the connection time to live <em>sent by the client</em>.
+ */
long getConnectionTTLOverride();
+ /**
+ * Returns the management address of this server.
+ * <br>
+ * Clients can send management messages to this address to manage this server.
+ */
String getManagementAddress();
+ /**
+ * Returns the management notification address of this server.
+ * <br>
+ * Clients can bind queues to this address to receive management notifications emitted by this server.
+ */
String getManagementNotificationAddress();
- long getManagementRequestTimeout();
-
+ /**
+ * Returns the size of the cache for pre-creating message IDs.
+ */
int getIDCacheSize();
+ /**
+ * Returns whether message ID cache is persisted.
+ */
boolean isPersistIDCache();
+ /**
+ * Returns the file system directory used to store large messages.
+ */
String getLargeMessagesDirectory();
+ /**
+ * Returns whether wildcard routing is supported by this server.
+ */
boolean isWildcardRoutingEnabled();
+ /**
+ * Returns the timeout (in milliseconds) after which transactions is removed
+ * from the resource manager after it was created.
+ */
long getTransactionTimeout();
+ /**
+ * Returns the frequency (in milliseconds) to scan transactions to detect which transactions
+ * have timed out.
+ */
long getTransactionTimeoutScanPeriod();
+ /**
+ * Returns the frequency (in milliseconds) to scan messages to detect which messages
+ * have expired.
+ */
long getMessageExpiryScanPeriod();
+ /**
+ * Returns the priority of the thread used to scan message expiration.
+ */
long getMessageExpiryThreadPriority();
+ /**
+ * Returns the connectors configured for this server.
+ */
Object[] getConnectors() throws Exception;
+ /**
+ * Returns the connectors configured for this server using JSON serialization.
+ */
String getConnectorsAsJSON() throws Exception;
+ /**
+ * Returns the addresses created on this server.
+ */
String[] getAddressNames();
+ /**
+ * Returns the names of the queues created on this server.
+ */
String[] getQueueNames();
// Operations ----------------------------------------------------
+ /**
+ * Create a durable queue.
+ * <br>
+ * This method throws a {@link HornetQException#QUEUE_EXISTS}) exception if the queue already exits.
+ *
+ * @param address address to bind the queue to
+ * @param name name of the queue
+ */
@Operation(desc = "Create a queue with the specified address", impact = MBeanOperationInfo.ACTION)
void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
@Parameter(name = "name", desc = "Name of the queue") String name) throws Exception;
+ /**
+ * Create a queue.
+ * <br>
+ * This method throws a {@link HornetQException#QUEUE_EXISTS}) exception if the queue already exits.
+ *
+ * @param address address to bind the queue to
+ * @param name name of the queue
+ * @param filter of the queue
+ * @param durable whether the queue is durable
+ */
@Operation(desc = "Create a queue", impact = MBeanOperationInfo.ACTION)
void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
@Parameter(name = "name", desc = "Name of the queue") String name,
@Parameter(name = "filter", desc = "Filter of the queue") String filter,
@Parameter(name = "durable", desc = "Is the queue durable?") boolean durable) throws Exception;
+ /**
+ * Create a queue.
+ * <br>
+ * This method throws a {@link HornetQException#QUEUE_EXISTS}) exception if the queue already exits.
+ *
+ * @param address address to bind the queue to
+ * @param name name of the queue
+ * @param durable whether the queue is durable
+ */
@Operation(desc = "Create a queue with the specified address, name and durability", impact = MBeanOperationInfo.ACTION)
void createQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
@Parameter(name = "name", desc = "Name of the queue") String name,
@Parameter(name = "durable", desc = "Is the queue durable?") boolean durable) throws Exception;
+ /**
+ * Deploy a durable queue.
+ * <br>
+ * This method will do nothing if the queue with the given name already exists on the server.
+ *
+ * @param address address to bind the queue to
+ * @param name name of the queue
+ * @param filter of the queue
+ */
@Operation(desc = "Deploy a queue", impact = MBeanOperationInfo.ACTION)
void deployQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
@Parameter(name = "name", desc = "Name of the queue") String name,
- String filterString) throws Exception;
+ @Parameter(name = "filter", desc = "Filter of the queue")String filter) throws Exception;
+ /**
+ * Deploy a queue.
+ * <br>
+ * This method will do nothing if the queue with the given name already exists on the server.
+ *
+ * @param address address to bind the queue to
+ * @param name name of the queue
+ * @param filter of the queue
+ * @param durable whether the queue is durable
+ */
@Operation(desc = "Deploy a queue", impact = MBeanOperationInfo.ACTION)
void deployQueue(@Parameter(name = "address", desc = "Address of the queue") String address,
@Parameter(name = "name", desc = "Name of the queue") String name,
@Parameter(name = "filter", desc = "Filter of the queue") String filter,
@Parameter(name = "durable", desc = "Is the queue durable?") boolean durable) throws Exception;
+ /**
+ * Destroys the queue corresponding to the specified name.
+ */
@Operation(desc = "Destroy a queue", impact = MBeanOperationInfo.ACTION)
void destroyQueue(@Parameter(name = "name", desc = "Name of the queue to destroy") String name) throws Exception;
+ /**
+ * Enables message counters for this server.
+ */
@Operation(desc = "Enable message counters", impact = MBeanOperationInfo.ACTION)
void enableMessageCounters() throws Exception;
+ /**
+ * Disables message counters for this server.
+ */
@Operation(desc = "Disable message counters", impact = MBeanOperationInfo.ACTION)
void disableMessageCounters() throws Exception;
+ /**
+ * Reset all message counters.
+ */
@Operation(desc = "Reset all message counters", impact = MBeanOperationInfo.ACTION)
void resetAllMessageCounters() throws Exception;
+ /**
+ * Reset histories for all message counters.
+ */
@Operation(desc = "Reset all message counters history", impact = MBeanOperationInfo.ACTION)
void resetAllMessageCounterHistories() throws Exception;
+ /**
+ * List all the prepared transaction, sorted by date, oldest first.
+ * <br>
+ * The Strings are Base-64 representation of the transaction XID and can be
+ * used to heuristically commit or rollback the transactions.
+ *
+ * @see #commitPreparedTransaction(String), {@link #rollbackPreparedTransaction(String)}
+ */
@Operation(desc = "List all the prepared transaction, sorted by date, oldest first")
String[] listPreparedTransactions() throws Exception;
+ /**
+ * List transactions which have been heuristically committed.
+ */
String[] listHeuristicCommittedTransactions() throws Exception;
+ /**
+ * List transactions which have been heuristically rolled back.
+ */
String[] listHeuristicRolledBackTransactions() throws Exception;
+ /**
+ * Heuristically commits a prepared transaction.
+ *
+ * @param transactionAsBase64 base 64 representation of a prepare transaction
+ * @return {@code true} if the transaction was successfully committed, {@code false} else
+ *
+ * @see #listPreparedTransactions()
+ */
@Operation(desc = "Commit a prepared transaction")
boolean commitPreparedTransaction(@Parameter(desc = "the Base64 representation of a transaction", name = "transactionAsBase64") String transactionAsBase64) throws Exception;
+ /**
+ * Heuristically rolls back a prepared transaction.
+ *
+ * @param transactionAsBase64 base 64 representation of a prepare transaction
+ * @return {@code true} if the transaction was successfully rolled back, {@code false} else
+ *
+ * @see #listPreparedTransactions()
+ */
@Operation(desc = "Rollback a prepared transaction")
boolean rollbackPreparedTransaction(@Parameter(desc = "the Base64 representation of a transaction", name = "transactionAsBase64") String transactionAsBase64) throws Exception;
+ /**
+ * Lists the addresses of all the clients connected to this address.
+ */
@Operation(desc = "List the client addresses", impact = MBeanOperationInfo.INFO)
String[] listRemoteAddresses() throws Exception;
+ /**
+ * Lists the addresses of the clients connected to this address which matches the specified IP address.
+ */
@Operation(desc = "List the client addresses which match the given IP Address", impact = MBeanOperationInfo.INFO)
String[] listRemoteAddresses(@Parameter(desc = "an IP address", name = "ipAddress") String ipAddress) throws Exception;
+ /**
+ * Closes all the connections of clients connected to this server which matches the specified IP address.
+ */
@Operation(desc = "Closes all the connections for the given IP Address", impact = MBeanOperationInfo.INFO)
boolean closeConnectionsForAddress(@Parameter(desc = "an IP address", name = "ipAddress") String ipAddress) throws Exception;
+ /**
+ * Lists all the IDs of the connections connected to this server.
+ */
@Operation(desc = "List all the connection IDs", impact = MBeanOperationInfo.INFO)
String[] listConnectionIDs() throws Exception;
+ /**
+ * Lists all the sessions IDs for the specified connection ID.
+ */
@Operation(desc = "List the sessions for the given connectionID", impact = MBeanOperationInfo.INFO)
String[] listSessions(@Parameter(desc = "a connection ID", name = "connectionID") String connectionID) throws Exception;
+ /**
+ * This method is used by HornetQ clustering and must not be called by HornetQ clients.
+ */
void sendQueueInfoToQueue(String queueName, String address) throws Exception;
}
Deleted: trunk/src/main/org/hornetq/core/management/ManagementService.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/ManagementService.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/core/management/ManagementService.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -1,137 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.management;
-
-import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
-
-import javax.management.ObjectName;
-
-import org.hornetq.core.cluster.DiscoveryGroup;
-import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.TransportConfiguration;
-import org.hornetq.core.config.cluster.BridgeConfiguration;
-import org.hornetq.core.config.cluster.BroadcastGroupConfiguration;
-import org.hornetq.core.config.cluster.ClusterConnectionConfiguration;
-import org.hornetq.core.config.cluster.DiscoveryGroupConfiguration;
-import org.hornetq.core.config.cluster.DivertConfiguration;
-import org.hornetq.core.management.impl.HornetQServerControlImpl;
-import org.hornetq.core.messagecounter.MessageCounterManager;
-import org.hornetq.core.paging.PagingManager;
-import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.postoffice.PostOffice;
-import org.hornetq.core.remoting.server.RemotingService;
-import org.hornetq.core.remoting.spi.Acceptor;
-import org.hornetq.core.security.Role;
-import org.hornetq.core.server.Divert;
-import org.hornetq.core.server.HornetQComponent;
-import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.Queue;
-import org.hornetq.core.server.QueueFactory;
-import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.server.cluster.Bridge;
-import org.hornetq.core.server.cluster.BroadcastGroup;
-import org.hornetq.core.server.cluster.ClusterConnection;
-import org.hornetq.core.settings.HierarchicalRepository;
-import org.hornetq.core.settings.impl.AddressSettings;
-import org.hornetq.core.transaction.ResourceManager;
-import org.hornetq.utils.SimpleString;
-
-/**
- * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
- *
- * @version <tt>$Revision$</tt>
- *
- */
-public interface ManagementService extends NotificationService, HornetQComponent
-{
- // Configuration
-
- MessageCounterManager getMessageCounterManager();
-
- String getClusterUser();
-
- String getClusterPassword();
-
- SimpleString getManagementAddress();
-
- SimpleString getManagementNotificationAddress();
-
- ObjectNameBuilder getObjectNameBuilder();
-
- // Resource Registration
-
- void setStorageManager(StorageManager storageManager);
-
- HornetQServerControlImpl registerServer(PostOffice postOffice,
- StorageManager storageManager,
- Configuration configuration,
- HierarchicalRepository<AddressSettings> addressSettingsRepository,
- HierarchicalRepository<Set<Role>> securityRepository,
- ResourceManager resourceManager,
- RemotingService remotingService,
- HornetQServer messagingServer,
- QueueFactory queueFactory,
- ScheduledExecutorService scheduledThreadPool,
- final PagingManager pagingManager,
- boolean backup) throws Exception;
-
- void unregisterServer() throws Exception;
-
- void registerInJMX(ObjectName objectName, Object managedResource) throws Exception;
-
- void unregisterFromJMX(final ObjectName objectName) throws Exception;
-
- void registerInRegistry(String resourceName, Object managedResource);
-
- void unregisterFromRegistry(final String resourceName);
-
- void registerAddress(SimpleString address) throws Exception;
-
- void unregisterAddress(SimpleString address) throws Exception;
-
- void registerQueue(Queue queue, SimpleString address, StorageManager storageManager) throws Exception;
-
- void unregisterQueue(SimpleString name, SimpleString address) throws Exception;
-
- void registerAcceptor(Acceptor acceptor, TransportConfiguration configuration) throws Exception;
-
- void unregisterAcceptors();
-
- void registerDivert(Divert divert, DivertConfiguration config) throws Exception;
-
- void unregisterDivert(SimpleString name) throws Exception;
-
- void registerBroadcastGroup(BroadcastGroup broadcastGroup, BroadcastGroupConfiguration configuration) throws Exception;
-
- void unregisterBroadcastGroup(String name) throws Exception;
-
- void registerDiscoveryGroup(DiscoveryGroup discoveryGroup, DiscoveryGroupConfiguration configuration) throws Exception;
-
- void unregisterDiscoveryGroup(String name) throws Exception;
-
- void registerBridge(Bridge bridge, BridgeConfiguration configuration) throws Exception;
-
- void unregisterBridge(String name) throws Exception;
-
- void registerCluster(ClusterConnection cluster, ClusterConnectionConfiguration configuration) throws Exception;
-
- void unregisterCluster(String name) throws Exception;
-
- Object getResource(String resourceName);
-
- Object[] getResources(Class<?> resourceType);
-
- ServerMessage handleMessage(ServerMessage message) throws Exception;
-}
Deleted: trunk/src/main/org/hornetq/core/management/Notification.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/Notification.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/core/management/Notification.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -1,62 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.management;
-
-import org.hornetq.utils.TypedProperties;
-
-/**
- * A Notification
- *
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
- * Created 22 Jan 2009 16:41:12
- *
- *
- */
-public class Notification
-{
- private final NotificationType type;
-
- private final TypedProperties properties;
-
- private final String uid;
-
- public Notification(final String uid, final NotificationType type, final TypedProperties properties)
- {
- this.uid = uid;
- this.type = type;
- this.properties = properties;
- }
-
- public NotificationType getType()
- {
- return type;
- }
-
- public TypedProperties getProperties()
- {
- return properties;
- }
-
- public String getUID()
- {
- return uid;
- }
-
- @Override
- public String toString()
- {
- return "Notification[uid=" + uid + ", type=" + type + ", properties=" + properties + "]";
- }
-}
Deleted: trunk/src/main/org/hornetq/core/management/NotificationListener.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/NotificationListener.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/core/management/NotificationListener.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -1,28 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.management;
-
-/**
- * A NotificationListener
- *
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
- * Created 22 Jan 2009 16:48:27
- *
- *
- */
-public interface NotificationListener
-{
- void onNotification(Notification notification);
-}
Deleted: trunk/src/main/org/hornetq/core/management/NotificationService.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/NotificationService.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/core/management/NotificationService.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -1,47 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.management;
-
-import org.hornetq.core.client.management.impl.ManagementHelper;
-
-/**
- * A NotificationService
- *
- * @author jmesnil
- *
- *
- */
-public interface NotificationService
-{
- /**
- * the message corresponding to a notification will always contain the properties:
- * <ul>
- * <li><code>ManagementHelper.HDR_NOTIFICATION_TYPE</code> - the type of notification (SimpleString)</li>
- * <li><code>ManagementHelper.HDR_NOTIFICATION_MESSAGE</code> - a message contextual to the notification (SimpleString)</li>
- * <li><code>ManagementHelper.HDR_NOTIFICATION_TIMESTAMP</code> - the timestamp when the notification occured (long)</li>
- * </ul>
- *
- * in addition to the properties defined in <code>props</code>
- *
- * @see ManagementHelper
- */
- void sendNotification(Notification notification) throws Exception;
-
- void enableNotifications(boolean enable);
-
- void addNotificationListener(NotificationListener listener);
-
- void removeNotificationListener(NotificationListener listener);
-
-}
Modified: trunk/src/main/org/hornetq/core/management/ObjectNameBuilder.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/ObjectNameBuilder.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/core/management/ObjectNameBuilder.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -16,10 +16,14 @@
import javax.management.ObjectName;
import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.jms.server.management.ConnectionFactoryControl;
+import org.hornetq.jms.server.management.JMSQueueControl;
+import org.hornetq.jms.server.management.JMSServerControl;
+import org.hornetq.jms.server.management.TopicControl;
import org.hornetq.utils.SimpleString;
/**
- * A ObjectNameBuilder
+ * Helper class to build ObjectNames for HornetQ resources.
*
* @author <a href="jmesnil(a)redhat.com">Jeff Mesnil</a>
*
@@ -29,6 +33,9 @@
// Constants -----------------------------------------------------
+ /**
+ * Default JMX domain for HornetQ resources.
+ */
public static ObjectNameBuilder DEFAULT = new ObjectNameBuilder(ConfigurationImpl.DEFAULT_JMX_DOMAIN);
public static final String JMS_MODULE = "JMS";
@@ -55,16 +62,29 @@
// Public --------------------------------------------------------
+ /**
+ * Returns the ObjectName used by the single HornetQServerControl.
+ */
public ObjectName getHornetQServerObjectName() throws Exception
{
return ObjectName.getInstance(domain + ":module=Core,type=Server");
}
+ /**
+ * Returns the ObjectName used by AddressControl.
+ *
+ * @see AddressControl
+ */
public ObjectName getAddressObjectName(final SimpleString address) throws Exception
{
return createObjectName(ObjectNameBuilder.CORE_MODULE, "Address", address.toString());
}
+ /**
+ * Returns the ObjectName used by QueueControl.
+ *
+ * @see QueueControl
+ */
public ObjectName getQueueObjectName(final SimpleString address, final SimpleString name) throws Exception
{
return ObjectName.getInstance(String.format("%s:module=%s,type=%s,address=%s,name=%s",
@@ -75,51 +95,101 @@
ObjectName.quote(name.toString())));
}
+ /**
+ * Returns the ObjectName used by DivertControl.
+ *
+ * @see DivertControl
+ */
public ObjectName getDivertObjectName(final SimpleString name) throws Exception
{
return createObjectName(ObjectNameBuilder.CORE_MODULE, "Divert", name.toString());
}
+ /**
+ * Returns the ObjectName used by AcceptorControl.
+ *
+ * @see AcceptorControl
+ */
public ObjectName getAcceptorObjectName(final String name) throws Exception
{
return createObjectName(ObjectNameBuilder.CORE_MODULE, "Acceptor", name);
}
+ /**
+ * Returns the ObjectName used by BroadcastGroupControl.
+ *
+ * @see BroadcastGroupControl
+ */
public ObjectName getBroadcastGroupObjectName(final String name) throws Exception
{
return createObjectName(ObjectNameBuilder.CORE_MODULE, "BroadcastGroup", name);
}
+ /**
+ * Returns the ObjectName used by BridgeControl.
+ *
+ * @see BridgeControl
+ */
public ObjectName getBridgeObjectName(final String name) throws Exception
{
return createObjectName(ObjectNameBuilder.CORE_MODULE, "JMSBridge", name);
}
+ /**
+ * Returns the ObjectName used by ClusterConnectionControl.
+ *
+ * @see ClusterConnectionControl
+ */
public ObjectName getClusterConnectionObjectName(final String name) throws Exception
{
return createObjectName(ObjectNameBuilder.CORE_MODULE, "ClusterConnection", name);
}
+ /**
+ * Returns the ObjectName used by DiscoveryGroupControl.
+ *
+ * @see DiscoveryGroupControl
+ */
public ObjectName getDiscoveryGroupObjectName(final String name) throws Exception
{
return createObjectName(ObjectNameBuilder.CORE_MODULE, "DiscoveryGroup", name);
}
+ /**
+ * Returns the ObjectName used by JMSServerControl.
+ *
+ * @see JMSServerControl
+ */
public ObjectName getJMSServerObjectName() throws Exception
{
return ObjectName.getInstance(domain + ":module=JMS,type=Server");
}
+ /**
+ * Returns the ObjectName used by JMSQueueControl.
+ *
+ * @see JMSQueueControl
+ */
public ObjectName getJMSQueueObjectName(final String name) throws Exception
{
return createObjectName(ObjectNameBuilder.JMS_MODULE, "Queue", name);
}
+ /**
+ * Returns the ObjectName used by TopicControl.
+ *
+ * @see TopicControl
+ */
public ObjectName getJMSTopicObjectName(final String name) throws Exception
{
return createObjectName(ObjectNameBuilder.JMS_MODULE, "Topic", name);
}
+ /**
+ * Returns the ObjectName used by ConnectionFactoryControl.
+ *
+ * @see ConnectionFactoryControl
+ */
public ObjectName getConnectionFactoryObjectName(final String name) throws Exception
{
return createObjectName(ObjectNameBuilder.JMS_MODULE, "ConnectionFactory", name);
Deleted: trunk/src/main/org/hornetq/core/management/Operation.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/Operation.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/core/management/Operation.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -1,43 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.management;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Inherited;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-import javax.management.MBeanOperationInfo;
-
-/**
- * Info for a MBean Operation.
- *
- * This annotation is used only for methods which can be invoked
- * through a GUI.
- *
- * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
- *
- * @version <tt>$Revision$</tt>
- *
- */
-(a)Retention(RetentionPolicy.RUNTIME)
-(a)Target(ElementType.METHOD)
-@Inherited
-public @interface Operation
-{
- String desc();
-
- int impact() default MBeanOperationInfo.INFO;
-}
Deleted: trunk/src/main/org/hornetq/core/management/Parameter.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/Parameter.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/core/management/Parameter.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -1,36 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.management;
-
-import java.lang.annotation.ElementType;
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-import java.lang.annotation.Target;
-
-/**
- * Info for a MBean Operation Parameter.
- *
- * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
- *
- * @version <tt>$Revision$</tt>
- *
- */
-(a)Retention(RetentionPolicy.RUNTIME)
-(a)Target(ElementType.PARAMETER)
-public @interface Parameter
-{
- String name();
-
- String desc() default "N/A";
-}
Modified: trunk/src/main/org/hornetq/core/management/QueueControl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/QueueControl.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/core/management/QueueControl.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -17,6 +17,9 @@
import javax.management.MBeanOperationInfo;
+import org.hornetq.core.server.management.Operation;
+import org.hornetq.core.server.management.Parameter;
+
/**
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
*
Modified: trunk/src/main/org/hornetq/core/management/ResourceNames.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/ResourceNames.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/core/management/ResourceNames.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -14,11 +14,9 @@
package org.hornetq.core.management;
/**
- * A ResourceNames
+ * Helper class used to build resource names used by management messages.
*
- * @author jmesnil
- *
- *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
*/
public class ResourceNames
{
Modified: trunk/src/main/org/hornetq/core/management/RoleInfo.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/RoleInfo.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/core/management/RoleInfo.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -17,6 +17,9 @@
import org.hornetq.utils.json.JSONObject;
/**
+ * Helper class to create Java Objects from the
+ * JSON serialization returned by {@link AddressControl#getRolesAsJSON()}.
+ *
* @author <a href="jmesnil(a)redhat.com">Jeff Mesnil</a>
*/
public class RoleInfo
@@ -37,6 +40,10 @@
final private boolean manage;
+ /**
+ * Returns an array of RoleInfo corresponding to the JSON serialization returned
+ * by {@link AddressControl#getRolesAsJSON()}.
+ */
public static final RoleInfo[] from(final String jsonString) throws Exception
{
JSONArray array = new JSONArray(jsonString);
@@ -76,41 +83,65 @@
this.manage = manage;
}
+ /**
+ * Returns the name of the role.
+ */
public String getName()
{
return name;
}
+ /**
+ * Returns whether this role can send messages to the address.
+ */
public boolean isSend()
{
return send;
}
+ /**
+ * Returns whether this role can consume messages from queues bound to the address.
+ */
public boolean isConsume()
{
return consume;
}
+ /**
+ * Returns whether this role can create durable queues bound to the address.
+ */
public boolean isCreateDurableQueue()
{
return createDurableQueue;
}
+ /**
+ * Returns whether this role can delete durable queues bound to the address.
+ */
public boolean isDeleteDurableQueue()
{
return deleteDurableQueue;
}
+ /**
+ * Returns whether this role can create non-durable queues bound to the address.
+ */
public boolean isCreateNonDurableQueue()
{
return createNonDurableQueue;
}
+ /**
+ * Returns whether this role can delete non-durable queues bound to the address.
+ */
public boolean isDeleteNonDurableQueue()
{
return deleteNonDurableQueue;
}
+ /**
+ * Returns whether this role can send management messages to the address.
+ */
public boolean isManage()
{
return manage;
Modified: trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -1173,11 +1173,6 @@
return configuration.getManagementNotificationAddress().toString();
}
- public long getManagementRequestTimeout()
- {
- return configuration.getManagementRequestTimeout();
- }
-
public long getMessageExpiryScanPeriod()
{
return configuration.getMessageExpiryScanPeriod();
Modified: trunk/src/main/org/hornetq/core/management/impl/MBeanInfoHelper.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/MBeanInfoHelper.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/core/management/impl/MBeanInfoHelper.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -21,8 +21,8 @@
import javax.management.MBeanOperationInfo;
import javax.management.MBeanParameterInfo;
-import org.hornetq.core.management.Operation;
-import org.hornetq.core.management.Parameter;
+import org.hornetq.core.server.management.Operation;
+import org.hornetq.core.server.management.Parameter;
/**
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
Deleted: trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -1,862 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.management.impl;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ScheduledExecutorService;
-
-import javax.management.MBeanServer;
-import javax.management.NotificationBroadcasterSupport;
-import javax.management.ObjectName;
-import javax.management.StandardMBean;
-
-import org.hornetq.core.client.management.impl.ManagementHelper;
-import org.hornetq.core.cluster.DiscoveryGroup;
-import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.TransportConfiguration;
-import org.hornetq.core.config.cluster.BridgeConfiguration;
-import org.hornetq.core.config.cluster.BroadcastGroupConfiguration;
-import org.hornetq.core.config.cluster.ClusterConnectionConfiguration;
-import org.hornetq.core.config.cluster.DiscoveryGroupConfiguration;
-import org.hornetq.core.config.cluster.DivertConfiguration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
-import org.hornetq.core.logging.Logger;
-import org.hornetq.core.management.AcceptorControl;
-import org.hornetq.core.management.BridgeControl;
-import org.hornetq.core.management.BroadcastGroupControl;
-import org.hornetq.core.management.ClusterConnectionControl;
-import org.hornetq.core.management.DiscoveryGroupControl;
-import org.hornetq.core.management.DivertControl;
-import org.hornetq.core.management.ManagementService;
-import org.hornetq.core.management.Notification;
-import org.hornetq.core.management.NotificationListener;
-import org.hornetq.core.management.ObjectNameBuilder;
-import org.hornetq.core.management.ResourceNames;
-import org.hornetq.core.messagecounter.MessageCounter;
-import org.hornetq.core.messagecounter.MessageCounterManager;
-import org.hornetq.core.messagecounter.impl.MessageCounterManagerImpl;
-import org.hornetq.core.paging.PagingManager;
-import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.postoffice.PostOffice;
-import org.hornetq.core.remoting.server.RemotingService;
-import org.hornetq.core.remoting.spi.Acceptor;
-import org.hornetq.core.security.Role;
-import org.hornetq.core.server.Divert;
-import org.hornetq.core.server.HornetQServer;
-import org.hornetq.core.server.Queue;
-import org.hornetq.core.server.QueueFactory;
-import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.server.cluster.Bridge;
-import org.hornetq.core.server.cluster.BroadcastGroup;
-import org.hornetq.core.server.cluster.ClusterConnection;
-import org.hornetq.core.server.impl.ServerMessageImpl;
-import org.hornetq.core.settings.HierarchicalRepository;
-import org.hornetq.core.settings.impl.AddressSettings;
-import org.hornetq.core.transaction.ResourceManager;
-import org.hornetq.utils.SimpleString;
-import org.hornetq.utils.TypedProperties;
-
-/*
- * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
- * @author <a href="mailto:fox@redhat.com">Tim Fox</a>
- *
- * @version <tt>$Revision$</tt>
- */
-public class ManagementServiceImpl implements ManagementService
-{
- // Constants -----------------------------------------------------
-
- private static final Logger log = Logger.getLogger(ManagementServiceImpl.class);
-
- private final MBeanServer mbeanServer;
-
- private final boolean jmxManagementEnabled;
-
- private final Map<String, Object> registry;
-
- private final NotificationBroadcasterSupport broadcaster;
-
- private PostOffice postOffice;
-
- private PagingManager pagingManager;
-
- private StorageManager storageManager;
-
- private HornetQServer messagingServer;
-
- private HierarchicalRepository<Set<Role>> securityRepository;
-
- private HierarchicalRepository<AddressSettings> addressSettingsRepository;
-
- private HornetQServerControlImpl messagingServerControl;
-
- private MessageCounterManager messageCounterManager;
-
- private final SimpleString managementNotificationAddress;
-
- private final SimpleString managementAddress;
-
- private final String managementClusterUser;
-
- private final String managementClusterPassword;
-
- private final long managementRequestTimeout;
-
- private boolean started = false;
-
- private final boolean messageCounterEnabled;
-
- private boolean notificationsEnabled;
-
- private final Set<NotificationListener> listeners = new org.hornetq.utils.ConcurrentHashSet<NotificationListener>();
-
- private final ObjectNameBuilder objectNameBuilder;
-
- // Static --------------------------------------------------------
-
- private static void checkDefaultManagementClusterCredentials(final String user, final String password)
- {
- if (ConfigurationImpl.DEFAULT_MANAGEMENT_CLUSTER_USER.equals(user) && ConfigurationImpl.DEFAULT_MANAGEMENT_CLUSTER_PASSWORD.equals(password))
- {
- ManagementServiceImpl.log.warn("It has been detected that the cluster admin user and password which are used to " + "replicate management operation from one node to the other have not been changed from the installation default. "
- + "Please see the HornetQ user guide for instructions on how to do this.");
- }
- }
-
- // Constructor ----------------------------------------------------
-
- public ManagementServiceImpl(final MBeanServer mbeanServer, final Configuration configuration)
- {
- this.mbeanServer = mbeanServer;
- jmxManagementEnabled = configuration.isJMXManagementEnabled();
- messageCounterEnabled = configuration.isMessageCounterEnabled();
- managementAddress = configuration.getManagementAddress();
- managementNotificationAddress = configuration.getManagementNotificationAddress();
- managementClusterUser = configuration.getManagementClusterUser();
- managementClusterPassword = configuration.getManagementClusterPassword();
- managementRequestTimeout = configuration.getManagementRequestTimeout();
-
- ManagementServiceImpl.checkDefaultManagementClusterCredentials(managementClusterUser, managementClusterPassword);
-
- registry = new HashMap<String, Object>();
- broadcaster = new NotificationBroadcasterSupport();
- notificationsEnabled = true;
- objectNameBuilder = ObjectNameBuilder.create(configuration.getJMXDomain());
- }
-
- // Public --------------------------------------------------------
-
- // ManagementService implementation -------------------------
-
- public ObjectNameBuilder getObjectNameBuilder()
- {
- return objectNameBuilder;
- }
-
- public MessageCounterManager getMessageCounterManager()
- {
- return messageCounterManager;
- }
-
- public void setStorageManager(final StorageManager storageManager)
- {
- this.storageManager = storageManager;
- }
-
- public HornetQServerControlImpl registerServer(final PostOffice postOffice,
- final StorageManager storageManager,
- final Configuration configuration,
- final HierarchicalRepository<AddressSettings> addressSettingsRepository,
- final HierarchicalRepository<Set<Role>> securityRepository,
- final ResourceManager resourceManager,
- final RemotingService remotingService,
- final HornetQServer messagingServer,
- final QueueFactory queueFactory,
- final ScheduledExecutorService scheduledThreadPool,
- final PagingManager pagingManager,
- final boolean backup) throws Exception
- {
- this.postOffice = postOffice;
- this.addressSettingsRepository = addressSettingsRepository;
- this.securityRepository = securityRepository;
- this.storageManager = storageManager;
- this.messagingServer = messagingServer;
- this.pagingManager = pagingManager;
-
- messageCounterManager = new MessageCounterManagerImpl(scheduledThreadPool);
- messageCounterManager.setMaxDayCount(configuration.getMessageCounterMaxDayHistory());
- messageCounterManager.reschedule(configuration.getMessageCounterSamplePeriod());
-
- messagingServerControl = new HornetQServerControlImpl(postOffice,
- configuration,
- resourceManager,
- remotingService,
- messagingServer,
- messageCounterManager,
- storageManager,
- broadcaster);
- ObjectName objectName = objectNameBuilder.getHornetQServerObjectName();
- registerInJMX(objectName, messagingServerControl);
- registerInRegistry(ResourceNames.CORE_SERVER, messagingServerControl);
-
- return messagingServerControl;
- }
-
- public synchronized void unregisterServer() throws Exception
- {
- ObjectName objectName = objectNameBuilder.getHornetQServerObjectName();
- unregisterFromJMX(objectName);
- unregisterFromRegistry(ResourceNames.CORE_SERVER);
- }
-
- public synchronized void registerAddress(final SimpleString address) throws Exception
- {
- ObjectName objectName = objectNameBuilder.getAddressObjectName(address);
- AddressControlImpl addressControl = new AddressControlImpl(address,
- postOffice,
- pagingManager,
- storageManager,
- securityRepository);
-
- registerInJMX(objectName, addressControl);
-
- registerInRegistry(ResourceNames.CORE_ADDRESS + address, addressControl);
-
- if (ManagementServiceImpl.log.isDebugEnabled())
- {
- ManagementServiceImpl.log.debug("registered address " + objectName);
- }
- }
-
- public synchronized void unregisterAddress(final SimpleString address) throws Exception
- {
- ObjectName objectName = objectNameBuilder.getAddressObjectName(address);
-
- unregisterFromJMX(objectName);
- unregisterFromRegistry(ResourceNames.CORE_ADDRESS + address);
- }
-
- public synchronized void registerQueue(final Queue queue,
- final SimpleString address,
- final StorageManager storageManager) throws Exception
- {
- QueueControlImpl queueControl = new QueueControlImpl(queue,
- address.toString(),
- postOffice,
- storageManager,
- addressSettingsRepository);
- if (messageCounterManager != null)
- {
- MessageCounter counter = new MessageCounter(queue.getName().toString(),
- null,
- queueControl,
- false,
- queue.isDurable(),
- messageCounterManager.getMaxDayCount());
- queueControl.setMessageCounter(counter);
- messageCounterManager.registerMessageCounter(queue.getName().toString(), counter);
- }
- ObjectName objectName = objectNameBuilder.getQueueObjectName(address, queue.getName());
- registerInJMX(objectName, queueControl);
- registerInRegistry(ResourceNames.CORE_QUEUE + queue.getName(), queueControl);
-
- if (ManagementServiceImpl.log.isDebugEnabled())
- {
- ManagementServiceImpl.log.debug("registered queue " + objectName);
- }
- }
-
- public synchronized void unregisterQueue(final SimpleString name, final SimpleString address) throws Exception
- {
- ObjectName objectName = objectNameBuilder.getQueueObjectName(address, name);
- unregisterFromJMX(objectName);
- unregisterFromRegistry(ResourceNames.CORE_QUEUE + name);
- messageCounterManager.unregisterMessageCounter(name.toString());
- }
-
- public synchronized void registerDivert(final Divert divert, final DivertConfiguration config) throws Exception
- {
- ObjectName objectName = objectNameBuilder.getDivertObjectName(divert.getUniqueName());
- DivertControl divertControl = new DivertControlImpl(divert, storageManager, config);
- registerInJMX(objectName, new StandardMBean(divertControl, DivertControl.class));
- registerInRegistry(ResourceNames.CORE_DIVERT + config.getName(), divertControl);
-
- if (ManagementServiceImpl.log.isDebugEnabled())
- {
- ManagementServiceImpl.log.debug("registered divert " + objectName);
- }
- }
-
- public synchronized void unregisterDivert(final SimpleString name) throws Exception
- {
- ObjectName objectName = objectNameBuilder.getDivertObjectName(name);
- unregisterFromJMX(objectName);
- unregisterFromRegistry(ResourceNames.CORE_DIVERT + name);
- }
-
- public synchronized void registerAcceptor(final Acceptor acceptor, final TransportConfiguration configuration) throws Exception
- {
- ObjectName objectName = objectNameBuilder.getAcceptorObjectName(configuration.getName());
- AcceptorControl control = new AcceptorControlImpl(acceptor, storageManager, configuration);
- registerInJMX(objectName, new StandardMBean(control, AcceptorControl.class));
- registerInRegistry(ResourceNames.CORE_ACCEPTOR + configuration.getName(), control);
- }
-
- public void unregisterAcceptors()
- {
- List<String> acceptors = new ArrayList<String>();
- for (String resourceName : registry.keySet())
- {
- if (resourceName.startsWith(ResourceNames.CORE_ACCEPTOR))
- {
- acceptors.add(resourceName);
- }
- }
-
- for (String acceptor : acceptors)
- {
- String name = acceptor.substring(ResourceNames.CORE_ACCEPTOR.length());
- try
- {
- unregisterAcceptor(name);
- }
- catch (Exception e)
- {
- e.printStackTrace();
- }
- }
- }
-
- public synchronized void unregisterAcceptor(final String name) throws Exception
- {
- ObjectName objectName = objectNameBuilder.getAcceptorObjectName(name);
- unregisterFromJMX(objectName);
- unregisterFromRegistry(ResourceNames.CORE_ACCEPTOR + name);
- }
-
- public synchronized void registerBroadcastGroup(final BroadcastGroup broadcastGroup,
- final BroadcastGroupConfiguration configuration) throws Exception
- {
- broadcastGroup.setNotificationService(this);
- ObjectName objectName = objectNameBuilder.getBroadcastGroupObjectName(configuration.getName());
- BroadcastGroupControl control = new BroadcastGroupControlImpl(broadcastGroup, storageManager, configuration);
- registerInJMX(objectName, new StandardMBean(control, BroadcastGroupControl.class));
- registerInRegistry(ResourceNames.CORE_BROADCAST_GROUP + configuration.getName(), control);
- }
-
- public synchronized void unregisterBroadcastGroup(final String name) throws Exception
- {
- ObjectName objectName = objectNameBuilder.getBroadcastGroupObjectName(name);
- unregisterFromJMX(objectName);
- unregisterFromRegistry(ResourceNames.CORE_BROADCAST_GROUP + name);
- }
-
- public synchronized void registerDiscoveryGroup(final DiscoveryGroup discoveryGroup,
- final DiscoveryGroupConfiguration configuration) throws Exception
- {
- discoveryGroup.setNotificationService(this);
- ObjectName objectName = objectNameBuilder.getDiscoveryGroupObjectName(configuration.getName());
- DiscoveryGroupControl control = new DiscoveryGroupControlImpl(discoveryGroup, storageManager, configuration);
- registerInJMX(objectName, new StandardMBean(control, DiscoveryGroupControl.class));
- registerInRegistry(ResourceNames.CORE_DISCOVERY_GROUP + configuration.getName(), control);
- }
-
- public synchronized void unregisterDiscoveryGroup(final String name) throws Exception
- {
- ObjectName objectName = objectNameBuilder.getDiscoveryGroupObjectName(name);
- unregisterFromJMX(objectName);
- unregisterFromRegistry(ResourceNames.CORE_DISCOVERY_GROUP + name);
- }
-
- public synchronized void registerBridge(final Bridge bridge, final BridgeConfiguration configuration) throws Exception
- {
- bridge.setNotificationService(this);
- ObjectName objectName = objectNameBuilder.getBridgeObjectName(configuration.getName());
- BridgeControl control = new BridgeControlImpl(bridge, storageManager, configuration);
- registerInJMX(objectName, new StandardMBean(control, BridgeControl.class));
- registerInRegistry(ResourceNames.CORE_BRIDGE + configuration.getName(), control);
- }
-
- public synchronized void unregisterBridge(final String name) throws Exception
- {
- ObjectName objectName = objectNameBuilder.getBridgeObjectName(name);
- unregisterFromJMX(objectName);
- unregisterFromRegistry(ResourceNames.CORE_BRIDGE + name);
- }
-
- public synchronized void registerCluster(final ClusterConnection cluster,
- final ClusterConnectionConfiguration configuration) throws Exception
- {
- ObjectName objectName = objectNameBuilder.getClusterConnectionObjectName(configuration.getName());
- ClusterConnectionControl control = new ClusterConnectionControlImpl(cluster, storageManager, configuration);
- registerInJMX(objectName, new StandardMBean(control, ClusterConnectionControl.class));
- registerInRegistry(ResourceNames.CORE_CLUSTER_CONNECTION + configuration.getName(), control);
- }
-
- public synchronized void unregisterCluster(final String name) throws Exception
- {
- ObjectName objectName = objectNameBuilder.getClusterConnectionObjectName(name);
- unregisterFromJMX(objectName);
- unregisterFromRegistry(ResourceNames.CORE_CLUSTER_CONNECTION + name);
- }
-
- public ServerMessage handleMessage(final ServerMessage message) throws Exception
- {
- // a reply message is sent with the result stored in the message body.
- ServerMessage reply = new ServerMessageImpl(storageManager.generateUniqueID(), 512);
-
- String resourceName = message.getStringProperty(ManagementHelper.HDR_RESOURCE_NAME);
- if (ManagementServiceImpl.log.isDebugEnabled())
- {
- ManagementServiceImpl.log.debug("handling management message for " + resourceName);
- }
-
- String operation = message.getStringProperty(ManagementHelper.HDR_OPERATION_NAME);
-
- if (operation != null)
- {
- Object[] params = ManagementHelper.retrieveOperationParameters(message);
-
- if (params == null)
- {
- params = new Object[0];
- }
-
- try
- {
- Object result = invokeOperation(resourceName, operation, params);
-
- ManagementHelper.storeResult(reply, result);
-
- reply.putBooleanProperty(ManagementHelper.HDR_OPERATION_SUCCEEDED, true);
- }
- catch (Exception e)
- {
- ManagementServiceImpl.log.warn("exception while invoking " + operation + " on " + resourceName, e);
- reply.putBooleanProperty(ManagementHelper.HDR_OPERATION_SUCCEEDED, false);
- String exceptionMessage = e.getMessage();
- if (e instanceof InvocationTargetException)
- {
- exceptionMessage = ((InvocationTargetException)e).getTargetException().getMessage();
- }
- if (e != null)
- {
- ManagementHelper.storeResult(reply, exceptionMessage);
- }
- }
- }
- else
- {
- String attribute = message.getStringProperty(ManagementHelper.HDR_ATTRIBUTE);
-
- if (attribute != null)
- {
- try
- {
- Object result = getAttribute(resourceName, attribute);
-
- ManagementHelper.storeResult(reply, result);
- }
- catch (Exception e)
- {
- ManagementServiceImpl.log.warn("exception while retrieving attribute " + attribute +
- " on " +
- resourceName, e);
- reply.putBooleanProperty(ManagementHelper.HDR_OPERATION_SUCCEEDED, false);
- String exceptionMessage = e.getMessage();
- if (e instanceof InvocationTargetException)
- {
- exceptionMessage = ((InvocationTargetException)e).getTargetException().getMessage();
- }
- if (e != null)
- {
- ManagementHelper.storeResult(reply, exceptionMessage);
- }
- }
- }
- }
-
- return reply;
- }
-
- public Object getResource(final String resourceName)
- {
- return registry.get(resourceName);
- }
-
- public Object[] getResources(final Class<?> resourceType)
- {
- List<Object> resources = new ArrayList<Object>();
- for (Object entry : registry.values())
- {
- if (resourceType.isAssignableFrom(entry.getClass()))
- {
- resources.add(entry);
- }
- }
- return resources.toArray(new Object[resources.size()]);
- }
-
- private final Set<ObjectName> registeredNames = new HashSet<ObjectName>();
-
- public void registerInJMX(final ObjectName objectName, final Object managedResource) throws Exception
- {
- if (!jmxManagementEnabled)
- {
- return;
- }
-
- synchronized (mbeanServer)
- {
- unregisterFromJMX(objectName);
-
- mbeanServer.registerMBean(managedResource, objectName);
-
- registeredNames.add(objectName);
- }
- }
-
- public synchronized void registerInRegistry(final String resourceName, final Object managedResource)
- {
- unregisterFromRegistry(resourceName);
-
- registry.put(resourceName, managedResource);
- }
-
- public void unregisterFromRegistry(final String resourceName)
- {
- registry.remove(resourceName);
- }
-
- // the JMX unregistration is synchronized to avoid race conditions if 2 clients tries to
- // unregister the same resource (e.g. a queue) at the same time since unregisterMBean()
- // will throw an exception if the MBean has already been unregistered
- public void unregisterFromJMX(final ObjectName objectName) throws Exception
- {
- if (!jmxManagementEnabled)
- {
- return;
- }
-
- synchronized (mbeanServer)
- {
- if (mbeanServer.isRegistered(objectName))
- {
- mbeanServer.unregisterMBean(objectName);
-
- registeredNames.remove(objectName);
- }
- }
- }
-
- public void addNotificationListener(final NotificationListener listener)
- {
- listeners.add(listener);
- }
-
- public void removeNotificationListener(final NotificationListener listener)
- {
- listeners.remove(listener);
- }
-
- public SimpleString getManagementAddress()
- {
- return managementAddress;
- }
-
- public SimpleString getManagementNotificationAddress()
- {
- return managementNotificationAddress;
- }
-
- public String getClusterUser()
- {
- return managementClusterUser;
- }
-
- public String getClusterPassword()
- {
- return managementClusterPassword;
- }
-
- // HornetQComponent implementation -----------------------------
-
- public void start() throws Exception
- {
- if (messageCounterEnabled)
- {
- messageCounterManager.start();
- }
-
- started = true;
- }
-
- public synchronized void stop() throws Exception
- {
- Set<String> resourceNames = new HashSet<String>(registry.keySet());
-
- for (String resourceName : resourceNames)
- {
- unregisterFromRegistry(resourceName);
- }
-
- if (jmxManagementEnabled)
- {
- if (!registeredNames.isEmpty())
- {
- List<String> unexpectedResourceNames = new ArrayList<String>();
- for (String name : resourceNames)
- {
- // only addresses and queues should still be registered
- if (!(name.startsWith(ResourceNames.CORE_ADDRESS) || name.startsWith(ResourceNames.CORE_QUEUE)))
- {
- unexpectedResourceNames.add(name);
- }
- }
- if (!unexpectedResourceNames.isEmpty())
- {
- ManagementServiceImpl.log.warn("On ManagementService stop, there are " + unexpectedResourceNames.size() +
- " unexpected registered MBeans: " +
- unexpectedResourceNames);
- }
-
- for (ObjectName on : registeredNames)
- {
- try
- {
- mbeanServer.unregisterMBean(on);
- }
- catch (Exception ignore)
- {
- }
- }
- }
- }
-
- if (messageCounterManager != null)
- {
- messageCounterManager.stop();
-
- messageCounterManager.resetAllCounters();
-
- messageCounterManager.resetAllCounterHistories();
-
- messageCounterManager.clear();
- }
-
- registeredNames.clear();
-
- started = false;
- }
-
- public boolean isStarted()
- {
- return started;
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- public void sendNotification(final Notification notification) throws Exception
- {
- if (messagingServerControl != null && notificationsEnabled)
- {
- // This needs to be synchronized since we need to ensure notifications are processed in strict sequence
- synchronized (this)
- {
- // We also need to synchronize on the post office notification lock
- // otherwise we can get notifications arriving in wrong order / missing
- // if a notification occurs at same time as sendQueueInfoToQueue is processed
- synchronized (postOffice.getNotificationLock())
- {
-
- // First send to any local listeners
- for (NotificationListener listener : listeners)
- {
- try
- {
- listener.onNotification(notification);
- }
- catch (Exception e)
- {
- // Exception thrown from one listener should not stop execution of others
- ManagementServiceImpl.log.error("Failed to call listener", e);
- }
- }
-
- // start sending notification *messages* only when the *remoting service* if started
- if (messagingServer == null || !messagingServer.getRemotingService().isStarted())
- {
- return;
- }
-
- long messageID = storageManager.generateUniqueID();
-
- ServerMessage notificationMessage = new ServerMessageImpl(messageID, 512);
-
- // Notification messages are always durable so the user can choose whether to add a durable queue to
- // consume
- // them in
- notificationMessage.setDurable(true);
- notificationMessage.setAddress(managementNotificationAddress);
-
- TypedProperties notifProps;
- if (notification.getProperties() != null)
- {
- notifProps = new TypedProperties(notification.getProperties());
- }
- else
- {
- notifProps = new TypedProperties();
- }
-
- notifProps.putSimpleStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE,
- new SimpleString(notification.getType().toString()));
-
- notifProps.putLongProperty(ManagementHelper.HDR_NOTIFICATION_TIMESTAMP, System.currentTimeMillis());
-
- if (notification.getUID() != null)
- {
- notifProps.putSimpleStringProperty(new SimpleString("foobar"),
- new SimpleString(notification.getUID()));
- }
-
- notificationMessage.putTypedProperties(notifProps);
-
- postOffice.route(notificationMessage);
- }
- }
- }
- }
-
- public void enableNotifications(final boolean enabled)
- {
- notificationsEnabled = enabled;
- }
-
- public Object getAttribute(final String resourceName, final String attribute)
- {
- try
- {
- Object resource = registry.get(resourceName);
-
- if (resource == null)
- {
- throw new IllegalArgumentException("Cannot find resource with name " + resourceName);
- }
-
- Method method = null;
-
- String upperCaseAttribute = attribute.substring(0, 1).toUpperCase() + attribute.substring(1);
- try
- {
- method = resource.getClass().getMethod("get" + upperCaseAttribute, new Class[0]);
- }
- catch (NoSuchMethodException nsme)
- {
- try
- {
- method = resource.getClass().getMethod("is" + upperCaseAttribute, new Class[0]);
- }
- catch (NoSuchMethodException nsme2)
- {
- throw new IllegalArgumentException("no getter method for " + attribute);
- }
- }
- return method.invoke(resource, new Object[0]);
- }
- catch (Throwable t)
- {
- throw new IllegalStateException("Problem while retrieving attribute " + attribute, t);
- }
- }
-
- private Object invokeOperation(final String resourceName, final String operation, final Object[] params) throws Exception
- {
- Object resource = registry.get(resourceName);
-
- if (resource == null)
- {
- throw new IllegalArgumentException("Cannot find resource with name " + resourceName);
- }
-
- Method method = null;
-
- Method[] methods = resource.getClass().getMethods();
- for (Method m : methods)
- {
- if (m.getName().equals(operation) && m.getParameterTypes().length == params.length)
- {
- boolean match = true;
-
- Class<?>[] paramTypes = m.getParameterTypes();
-
- for (int i = 0; i < paramTypes.length; i++)
- {
- if (params[i] == null)
- {
- continue;
- }
- if (paramTypes[i].isAssignableFrom(params[i].getClass()) || paramTypes[i] == Long.TYPE &&
- params[i].getClass() == Integer.class ||
- paramTypes[i] == Double.TYPE &&
- params[i].getClass() == Integer.class ||
- paramTypes[i] == Long.TYPE &&
- params[i].getClass() == Long.class ||
- paramTypes[i] == Double.TYPE &&
- params[i].getClass() == Double.class ||
- paramTypes[i] == Integer.TYPE &&
- params[i].getClass() == Integer.class ||
- paramTypes[i] == Boolean.TYPE &&
- params[i].getClass() == Boolean.class)
- {
- // parameter match
- }
- else
- {
- match = false;
- break; // parameter check loop
- }
- }
-
- if (match)
- {
- method = m;
- break; // method match loop
- }
- }
- }
-
- if (method == null)
- {
- throw new IllegalArgumentException("no operation " + operation + "/" + params.length);
- }
-
- Object result = method.invoke(resource, params);
-
- return result;
- }
-
- // Inner classes -------------------------------------------------
-}
Modified: trunk/src/main/org/hornetq/core/message/Message.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/Message.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/core/message/Message.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -92,8 +92,6 @@
/**
* Sets whether this message is durable or not.
*
- * This method must not be called directly by HornetQ clients.
-
* @param durable {@code true} to flag this message as durable, {@code false} else
*/
void setDurable(boolean durable);
@@ -110,8 +108,6 @@
/**
* Sets the expiration of this message.
- * <br>
- * This method must not be called directly by HornetQ clients.
*
* @param expiration expiration time
*/
@@ -127,8 +123,6 @@
/**
* Sets the message timestamp.
- * <br>
- * This method must not be called directly by HornetQ clients.
*
* @param timestamp timestamp
*/
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -30,9 +30,6 @@
import org.hornetq.core.filter.Filter;
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.management.ManagementService;
-import org.hornetq.core.management.Notification;
-import org.hornetq.core.management.NotificationListener;
import org.hornetq.core.management.NotificationType;
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.paging.PageTransactionInfo;
@@ -56,6 +53,9 @@
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.RoutingContextImpl;
import org.hornetq.core.server.impl.ServerMessageImpl;
+import org.hornetq.core.server.management.ManagementService;
+import org.hornetq.core.server.management.Notification;
+import org.hornetq.core.server.management.NotificationListener;
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.transaction.Transaction;
Modified: trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMAcceptor.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -19,13 +19,13 @@
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.management.Notification;
-import org.hornetq.core.management.NotificationService;
import org.hornetq.core.management.NotificationType;
import org.hornetq.core.remoting.spi.Acceptor;
import org.hornetq.core.remoting.spi.BufferHandler;
import org.hornetq.core.remoting.spi.Connection;
import org.hornetq.core.remoting.spi.ConnectionLifeCycleListener;
+import org.hornetq.core.server.management.Notification;
+import org.hornetq.core.server.management.NotificationService;
import org.hornetq.utils.ConfigurationHelper;
import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.OrderedExecutorFactory;
Modified: trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/core/remoting/server/impl/RemotingServiceImpl.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -28,7 +28,6 @@
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.management.ManagementService;
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.ChannelHandler;
import org.hornetq.core.remoting.Interceptor;
@@ -46,6 +45,7 @@
import org.hornetq.core.remoting.spi.ConnectionLifeCycleListener;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.impl.HornetQPacketHandler;
+import org.hornetq.core.server.management.ManagementService;
import org.hornetq.utils.ConfigurationHelper;
/**
Modified: trunk/src/main/org/hornetq/core/remoting/spi/Acceptor.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/spi/Acceptor.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/core/remoting/spi/Acceptor.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -13,8 +13,8 @@
package org.hornetq.core.remoting.spi;
-import org.hornetq.core.management.NotificationService;
import org.hornetq.core.server.HornetQComponent;
+import org.hornetq.core.server.management.NotificationService;
/**
* An Acceptor is used by the Remoting Service to allow clients to connect. It should take care of dispatching client requests
Modified: trunk/src/main/org/hornetq/core/security/impl/SecurityStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/security/impl/SecurityStoreImpl.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/core/security/impl/SecurityStoreImpl.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -22,14 +22,14 @@
import org.hornetq.core.client.management.impl.ManagementHelper;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.management.Notification;
-import org.hornetq.core.management.NotificationService;
import org.hornetq.core.management.NotificationType;
import org.hornetq.core.security.CheckType;
import org.hornetq.core.security.HornetQSecurityManager;
import org.hornetq.core.security.Role;
import org.hornetq.core.security.SecurityStore;
import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.management.Notification;
+import org.hornetq.core.server.management.NotificationService;
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.HierarchicalRepositoryChangeListener;
import org.hornetq.utils.ConcurrentHashSet;
Modified: trunk/src/main/org/hornetq/core/server/HornetQServer.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/HornetQServer.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/core/server/HornetQServer.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -19,7 +19,6 @@
import javax.management.MBeanServer;
import org.hornetq.core.config.Configuration;
-import org.hornetq.core.management.ManagementService;
import org.hornetq.core.management.impl.HornetQServerControlImpl;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.PostOffice;
@@ -33,6 +32,7 @@
import org.hornetq.core.security.Role;
import org.hornetq.core.server.cluster.ClusterManager;
import org.hornetq.core.server.group.GroupingHandler;
+import org.hornetq.core.server.management.ManagementService;
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.transaction.ResourceManager;
Modified: trunk/src/main/org/hornetq/core/server/cluster/Bridge.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/Bridge.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/core/server/cluster/Bridge.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -13,11 +13,11 @@
package org.hornetq.core.server.cluster;
-import org.hornetq.core.management.NotificationService;
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.server.Consumer;
import org.hornetq.core.server.HornetQComponent;
import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.management.NotificationService;
import org.hornetq.utils.SimpleString;
/**
Modified: trunk/src/main/org/hornetq/core/server/cluster/BroadcastGroup.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/BroadcastGroup.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/core/server/cluster/BroadcastGroup.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -14,8 +14,8 @@
package org.hornetq.core.server.cluster;
import org.hornetq.core.config.TransportConfiguration;
-import org.hornetq.core.management.NotificationService;
import org.hornetq.core.server.HornetQComponent;
+import org.hornetq.core.server.management.NotificationService;
import org.hornetq.utils.Pair;
/**
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -34,8 +34,6 @@
import org.hornetq.core.filter.Filter;
import org.hornetq.core.filter.impl.FilterImpl;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.management.Notification;
-import org.hornetq.core.management.NotificationService;
import org.hornetq.core.management.NotificationType;
import org.hornetq.core.management.ResourceNames;
import org.hornetq.core.message.Message;
@@ -50,6 +48,8 @@
import org.hornetq.core.server.cluster.Bridge;
import org.hornetq.core.server.cluster.MessageFlowRecord;
import org.hornetq.core.server.cluster.Transformer;
+import org.hornetq.core.server.management.Notification;
+import org.hornetq.core.server.management.NotificationService;
import org.hornetq.utils.Future;
import org.hornetq.utils.Pair;
import org.hornetq.utils.SimpleString;
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/BroadcastGroupImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/BroadcastGroupImpl.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/BroadcastGroupImpl.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -24,10 +24,10 @@
import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.management.Notification;
-import org.hornetq.core.management.NotificationService;
import org.hornetq.core.management.NotificationType;
import org.hornetq.core.server.cluster.BroadcastGroup;
+import org.hornetq.core.server.management.Notification;
+import org.hornetq.core.server.management.NotificationService;
import org.hornetq.utils.Pair;
import org.hornetq.utils.SimpleString;
import org.hornetq.utils.TypedProperties;
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -16,7 +16,11 @@
import static org.hornetq.core.management.NotificationType.CONSUMER_CLOSED;
import static org.hornetq.core.management.NotificationType.CONSUMER_CREATED;
-import java.util.*;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ScheduledExecutorService;
@@ -27,8 +31,6 @@
import org.hornetq.core.cluster.DiscoveryListener;
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.management.ManagementService;
-import org.hornetq.core.management.Notification;
import org.hornetq.core.management.NotificationType;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
@@ -42,6 +44,8 @@
import org.hornetq.core.server.cluster.RemoteQueueBinding;
import org.hornetq.core.server.group.impl.Proposal;
import org.hornetq.core.server.group.impl.Response;
+import org.hornetq.core.server.management.ManagementService;
+import org.hornetq.core.server.management.Notification;
import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.Pair;
import org.hornetq.utils.SimpleString;
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -34,7 +34,6 @@
import org.hornetq.core.config.cluster.ClusterConnectionConfiguration;
import org.hornetq.core.config.cluster.DiscoveryGroupConfiguration;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.management.ManagementService;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.HornetQServer;
@@ -44,6 +43,7 @@
import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.core.server.cluster.ClusterManager;
import org.hornetq.core.server.cluster.Transformer;
+import org.hornetq.core.server.management.ManagementService;
import org.hornetq.utils.Pair;
import org.hornetq.utils.SimpleString;
import org.hornetq.utils.UUID;
Modified: trunk/src/main/org/hornetq/core/server/group/GroupingHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/group/GroupingHandler.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/core/server/group/GroupingHandler.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -12,10 +12,10 @@
*/
package org.hornetq.core.server.group;
-import org.hornetq.core.management.NotificationListener;
import org.hornetq.core.server.group.impl.GroupBinding;
import org.hornetq.core.server.group.impl.Proposal;
import org.hornetq.core.server.group.impl.Response;
+import org.hornetq.core.server.management.NotificationListener;
import org.hornetq.utils.SimpleString;
/**
Modified: trunk/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/core/server/group/impl/LocalGroupingHandler.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -18,12 +18,12 @@
import org.hornetq.core.client.management.impl.ManagementHelper;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.management.ManagementService;
-import org.hornetq.core.management.Notification;
import org.hornetq.core.management.NotificationType;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.BindingType;
import org.hornetq.core.server.group.GroupingHandler;
+import org.hornetq.core.server.management.ManagementService;
+import org.hornetq.core.server.management.Notification;
import org.hornetq.utils.SimpleString;
import org.hornetq.utils.TypedProperties;
Modified: trunk/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/core/server/group/impl/RemoteGroupingHandler.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -24,11 +24,11 @@
import java.util.logging.Logger;
import org.hornetq.core.client.management.impl.ManagementHelper;
-import org.hornetq.core.management.ManagementService;
-import org.hornetq.core.management.Notification;
import org.hornetq.core.management.NotificationType;
import org.hornetq.core.postoffice.BindingType;
import org.hornetq.core.server.group.GroupingHandler;
+import org.hornetq.core.server.management.ManagementService;
+import org.hornetq.core.server.management.Notification;
import org.hornetq.utils.SimpleString;
import org.hornetq.utils.TypedProperties;
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -55,9 +55,7 @@
import org.hornetq.core.journal.impl.SyncSpeedTest;
import org.hornetq.core.logging.LogDelegateFactory;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.management.ManagementService;
import org.hornetq.core.management.impl.HornetQServerControlImpl;
-import org.hornetq.core.management.impl.ManagementServiceImpl;
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.impl.PagingManagerImpl;
import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
@@ -102,6 +100,8 @@
import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
import org.hornetq.core.server.group.impl.LocalGroupingHandler;
import org.hornetq.core.server.group.impl.RemoteGroupingHandler;
+import org.hornetq.core.server.management.ManagementService;
+import org.hornetq.core.server.management.impl.ManagementServiceImpl;
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.settings.impl.HierarchicalObjectRepository;
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -28,8 +28,6 @@
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.management.ManagementService;
-import org.hornetq.core.management.Notification;
import org.hornetq.core.management.NotificationType;
import org.hornetq.core.message.BodyEncoder;
import org.hornetq.core.persistence.StorageManager;
@@ -46,6 +44,8 @@
import org.hornetq.core.server.ServerConsumer;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.management.ManagementService;
+import org.hornetq.core.server.management.Notification;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.impl.TransactionImpl;
import org.hornetq.utils.Future;
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -35,8 +35,6 @@
import org.hornetq.core.filter.impl.FilterImpl;
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.management.ManagementService;
-import org.hornetq.core.management.Notification;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.Binding;
@@ -93,6 +91,8 @@
import org.hornetq.core.server.ServerConsumer;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.ServerSession;
+import org.hornetq.core.server.management.ManagementService;
+import org.hornetq.core.server.management.Notification;
import org.hornetq.core.transaction.ResourceManager;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.impl.TransactionImpl;
Copied: trunk/src/main/org/hornetq/core/server/management/ManagementService.java (from rev 8663, trunk/src/main/org/hornetq/core/management/ManagementService.java)
===================================================================
--- trunk/src/main/org/hornetq/core/server/management/ManagementService.java (rev 0)
+++ trunk/src/main/org/hornetq/core/server/management/ManagementService.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -0,0 +1,138 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.server.management;
+
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+
+import javax.management.ObjectName;
+
+import org.hornetq.core.cluster.DiscoveryGroup;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.core.config.cluster.BridgeConfiguration;
+import org.hornetq.core.config.cluster.BroadcastGroupConfiguration;
+import org.hornetq.core.config.cluster.ClusterConnectionConfiguration;
+import org.hornetq.core.config.cluster.DiscoveryGroupConfiguration;
+import org.hornetq.core.config.cluster.DivertConfiguration;
+import org.hornetq.core.management.ObjectNameBuilder;
+import org.hornetq.core.management.impl.HornetQServerControlImpl;
+import org.hornetq.core.messagecounter.MessageCounterManager;
+import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.remoting.server.RemotingService;
+import org.hornetq.core.remoting.spi.Acceptor;
+import org.hornetq.core.security.Role;
+import org.hornetq.core.server.Divert;
+import org.hornetq.core.server.HornetQComponent;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.QueueFactory;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.cluster.Bridge;
+import org.hornetq.core.server.cluster.BroadcastGroup;
+import org.hornetq.core.server.cluster.ClusterConnection;
+import org.hornetq.core.settings.HierarchicalRepository;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.core.transaction.ResourceManager;
+import org.hornetq.utils.SimpleString;
+
+/**
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+public interface ManagementService extends NotificationService, HornetQComponent
+{
+ // Configuration
+
+ MessageCounterManager getMessageCounterManager();
+
+ String getClusterUser();
+
+ String getClusterPassword();
+
+ SimpleString getManagementAddress();
+
+ SimpleString getManagementNotificationAddress();
+
+ ObjectNameBuilder getObjectNameBuilder();
+
+ // Resource Registration
+
+ void setStorageManager(StorageManager storageManager);
+
+ HornetQServerControlImpl registerServer(PostOffice postOffice,
+ StorageManager storageManager,
+ Configuration configuration,
+ HierarchicalRepository<AddressSettings> addressSettingsRepository,
+ HierarchicalRepository<Set<Role>> securityRepository,
+ ResourceManager resourceManager,
+ RemotingService remotingService,
+ HornetQServer messagingServer,
+ QueueFactory queueFactory,
+ ScheduledExecutorService scheduledThreadPool,
+ final PagingManager pagingManager,
+ boolean backup) throws Exception;
+
+ void unregisterServer() throws Exception;
+
+ void registerInJMX(ObjectName objectName, Object managedResource) throws Exception;
+
+ void unregisterFromJMX(final ObjectName objectName) throws Exception;
+
+ void registerInRegistry(String resourceName, Object managedResource);
+
+ void unregisterFromRegistry(final String resourceName);
+
+ void registerAddress(SimpleString address) throws Exception;
+
+ void unregisterAddress(SimpleString address) throws Exception;
+
+ void registerQueue(Queue queue, SimpleString address, StorageManager storageManager) throws Exception;
+
+ void unregisterQueue(SimpleString name, SimpleString address) throws Exception;
+
+ void registerAcceptor(Acceptor acceptor, TransportConfiguration configuration) throws Exception;
+
+ void unregisterAcceptors();
+
+ void registerDivert(Divert divert, DivertConfiguration config) throws Exception;
+
+ void unregisterDivert(SimpleString name) throws Exception;
+
+ void registerBroadcastGroup(BroadcastGroup broadcastGroup, BroadcastGroupConfiguration configuration) throws Exception;
+
+ void unregisterBroadcastGroup(String name) throws Exception;
+
+ void registerDiscoveryGroup(DiscoveryGroup discoveryGroup, DiscoveryGroupConfiguration configuration) throws Exception;
+
+ void unregisterDiscoveryGroup(String name) throws Exception;
+
+ void registerBridge(Bridge bridge, BridgeConfiguration configuration) throws Exception;
+
+ void unregisterBridge(String name) throws Exception;
+
+ void registerCluster(ClusterConnection cluster, ClusterConnectionConfiguration configuration) throws Exception;
+
+ void unregisterCluster(String name) throws Exception;
+
+ Object getResource(String resourceName);
+
+ Object[] getResources(Class<?> resourceType);
+
+ ServerMessage handleMessage(ServerMessage message) throws Exception;
+}
Copied: trunk/src/main/org/hornetq/core/server/management/Notification.java (from rev 8663, trunk/src/main/org/hornetq/core/management/Notification.java)
===================================================================
--- trunk/src/main/org/hornetq/core/server/management/Notification.java (rev 0)
+++ trunk/src/main/org/hornetq/core/server/management/Notification.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -0,0 +1,63 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.server.management;
+
+import org.hornetq.core.management.NotificationType;
+import org.hornetq.utils.TypedProperties;
+
+/**
+ * A Notification
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ * Created 22 Jan 2009 16:41:12
+ *
+ *
+ */
+public class Notification
+{
+ private final NotificationType type;
+
+ private final TypedProperties properties;
+
+ private final String uid;
+
+ public Notification(final String uid, final NotificationType type, final TypedProperties properties)
+ {
+ this.uid = uid;
+ this.type = type;
+ this.properties = properties;
+ }
+
+ public NotificationType getType()
+ {
+ return type;
+ }
+
+ public TypedProperties getProperties()
+ {
+ return properties;
+ }
+
+ public String getUID()
+ {
+ return uid;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Notification[uid=" + uid + ", type=" + type + ", properties=" + properties + "]";
+ }
+}
Copied: trunk/src/main/org/hornetq/core/server/management/NotificationListener.java (from rev 8663, trunk/src/main/org/hornetq/core/management/NotificationListener.java)
===================================================================
--- trunk/src/main/org/hornetq/core/server/management/NotificationListener.java (rev 0)
+++ trunk/src/main/org/hornetq/core/server/management/NotificationListener.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.server.management;
+
+
+/**
+ * A NotificationListener
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ * Created 22 Jan 2009 16:48:27
+ *
+ *
+ */
+public interface NotificationListener
+{
+ void onNotification(Notification notification);
+}
Copied: trunk/src/main/org/hornetq/core/server/management/NotificationService.java (from rev 8663, trunk/src/main/org/hornetq/core/management/NotificationService.java)
===================================================================
--- trunk/src/main/org/hornetq/core/server/management/NotificationService.java (rev 0)
+++ trunk/src/main/org/hornetq/core/server/management/NotificationService.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.server.management;
+
+import org.hornetq.core.client.management.impl.ManagementHelper;
+
+/**
+ * A NotificationService
+ *
+ * @author jmesnil
+ *
+ *
+ */
+public interface NotificationService
+{
+ /**
+ * the message corresponding to a notification will always contain the properties:
+ * <ul>
+ * <li><code>ManagementHelper.HDR_NOTIFICATION_TYPE</code> - the type of notification (SimpleString)</li>
+ * <li><code>ManagementHelper.HDR_NOTIFICATION_MESSAGE</code> - a message contextual to the notification (SimpleString)</li>
+ * <li><code>ManagementHelper.HDR_NOTIFICATION_TIMESTAMP</code> - the timestamp when the notification occured (long)</li>
+ * </ul>
+ *
+ * in addition to the properties defined in <code>props</code>
+ *
+ * @see ManagementHelper
+ */
+ void sendNotification(Notification notification) throws Exception;
+
+ void enableNotifications(boolean enable);
+
+ void addNotificationListener(NotificationListener listener);
+
+ void removeNotificationListener(NotificationListener listener);
+
+}
Copied: trunk/src/main/org/hornetq/core/server/management/Operation.java (from rev 8663, trunk/src/main/org/hornetq/core/management/Operation.java)
===================================================================
--- trunk/src/main/org/hornetq/core/server/management/Operation.java (rev 0)
+++ trunk/src/main/org/hornetq/core/server/management/Operation.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.server.management;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Inherited;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+import javax.management.MBeanOperationInfo;
+
+/**
+ * Info for a MBean Operation.
+ *
+ * This annotation is used only for methods which can be invoked
+ * through a GUI.
+ *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+(a)Retention(RetentionPolicy.RUNTIME)
+(a)Target(ElementType.METHOD)
+@Inherited
+public @interface Operation
+{
+ String desc();
+
+ int impact() default MBeanOperationInfo.INFO;
+}
Copied: trunk/src/main/org/hornetq/core/server/management/Parameter.java (from rev 8663, trunk/src/main/org/hornetq/core/management/Parameter.java)
===================================================================
--- trunk/src/main/org/hornetq/core/server/management/Parameter.java (rev 0)
+++ trunk/src/main/org/hornetq/core/server/management/Parameter.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -0,0 +1,36 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.server.management;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Info for a MBean Operation Parameter.
+ *
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ *
+ * @version <tt>$Revision$</tt>
+ *
+ */
+(a)Retention(RetentionPolicy.RUNTIME)
+(a)Target(ElementType.PARAMETER)
+public @interface Parameter
+{
+ String name();
+
+ String desc() default "N/A";
+}
Copied: trunk/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java (from rev 8663, trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java)
===================================================================
--- trunk/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java (rev 0)
+++ trunk/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -0,0 +1,868 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.server.management.impl;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ScheduledExecutorService;
+
+import javax.management.MBeanServer;
+import javax.management.NotificationBroadcasterSupport;
+import javax.management.ObjectName;
+import javax.management.StandardMBean;
+
+import org.hornetq.core.client.management.impl.ManagementHelper;
+import org.hornetq.core.cluster.DiscoveryGroup;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.core.config.cluster.BridgeConfiguration;
+import org.hornetq.core.config.cluster.BroadcastGroupConfiguration;
+import org.hornetq.core.config.cluster.ClusterConnectionConfiguration;
+import org.hornetq.core.config.cluster.DiscoveryGroupConfiguration;
+import org.hornetq.core.config.cluster.DivertConfiguration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.management.AcceptorControl;
+import org.hornetq.core.management.BridgeControl;
+import org.hornetq.core.management.BroadcastGroupControl;
+import org.hornetq.core.management.ClusterConnectionControl;
+import org.hornetq.core.management.DiscoveryGroupControl;
+import org.hornetq.core.management.DivertControl;
+import org.hornetq.core.management.ObjectNameBuilder;
+import org.hornetq.core.management.ResourceNames;
+import org.hornetq.core.management.impl.AcceptorControlImpl;
+import org.hornetq.core.management.impl.AddressControlImpl;
+import org.hornetq.core.management.impl.BridgeControlImpl;
+import org.hornetq.core.management.impl.BroadcastGroupControlImpl;
+import org.hornetq.core.management.impl.ClusterConnectionControlImpl;
+import org.hornetq.core.management.impl.DiscoveryGroupControlImpl;
+import org.hornetq.core.management.impl.DivertControlImpl;
+import org.hornetq.core.management.impl.HornetQServerControlImpl;
+import org.hornetq.core.management.impl.QueueControlImpl;
+import org.hornetq.core.messagecounter.MessageCounter;
+import org.hornetq.core.messagecounter.MessageCounterManager;
+import org.hornetq.core.messagecounter.impl.MessageCounterManagerImpl;
+import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.remoting.server.RemotingService;
+import org.hornetq.core.remoting.spi.Acceptor;
+import org.hornetq.core.security.Role;
+import org.hornetq.core.server.Divert;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.QueueFactory;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.cluster.Bridge;
+import org.hornetq.core.server.cluster.BroadcastGroup;
+import org.hornetq.core.server.cluster.ClusterConnection;
+import org.hornetq.core.server.impl.ServerMessageImpl;
+import org.hornetq.core.server.management.ManagementService;
+import org.hornetq.core.server.management.Notification;
+import org.hornetq.core.server.management.NotificationListener;
+import org.hornetq.core.settings.HierarchicalRepository;
+import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.core.transaction.ResourceManager;
+import org.hornetq.utils.SimpleString;
+import org.hornetq.utils.TypedProperties;
+
+/*
+ * @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
+ * @author <a href="mailto:fox@redhat.com">Tim Fox</a>
+ *
+ * @version <tt>$Revision$</tt>
+ */
+public class ManagementServiceImpl implements ManagementService
+{
+ // Constants -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(ManagementServiceImpl.class);
+
+ private final MBeanServer mbeanServer;
+
+ private final boolean jmxManagementEnabled;
+
+ private final Map<String, Object> registry;
+
+ private final NotificationBroadcasterSupport broadcaster;
+
+ private PostOffice postOffice;
+
+ private PagingManager pagingManager;
+
+ private StorageManager storageManager;
+
+ private HornetQServer messagingServer;
+
+ private HierarchicalRepository<Set<Role>> securityRepository;
+
+ private HierarchicalRepository<AddressSettings> addressSettingsRepository;
+
+ private HornetQServerControlImpl messagingServerControl;
+
+ private MessageCounterManager messageCounterManager;
+
+ private final SimpleString managementNotificationAddress;
+
+ private final SimpleString managementAddress;
+
+ private final String managementClusterUser;
+
+ private final String managementClusterPassword;
+
+ private boolean started = false;
+
+ private final boolean messageCounterEnabled;
+
+ private boolean notificationsEnabled;
+
+ private final Set<NotificationListener> listeners = new org.hornetq.utils.ConcurrentHashSet<NotificationListener>();
+
+ private final ObjectNameBuilder objectNameBuilder;
+
+ // Static --------------------------------------------------------
+
+ private static void checkDefaultManagementClusterCredentials(final String user, final String password)
+ {
+ if (ConfigurationImpl.DEFAULT_MANAGEMENT_CLUSTER_USER.equals(user) && ConfigurationImpl.DEFAULT_MANAGEMENT_CLUSTER_PASSWORD.equals(password))
+ {
+ ManagementServiceImpl.log.warn("It has been detected that the cluster admin user and password which are used to " + "replicate management operation from one node to the other have not been changed from the installation default. "
+ + "Please see the HornetQ user guide for instructions on how to do this.");
+ }
+ }
+
+ // Constructor ----------------------------------------------------
+
+ public ManagementServiceImpl(final MBeanServer mbeanServer, final Configuration configuration)
+ {
+ this.mbeanServer = mbeanServer;
+ jmxManagementEnabled = configuration.isJMXManagementEnabled();
+ messageCounterEnabled = configuration.isMessageCounterEnabled();
+ managementAddress = configuration.getManagementAddress();
+ managementNotificationAddress = configuration.getManagementNotificationAddress();
+ managementClusterUser = configuration.getManagementClusterUser();
+ managementClusterPassword = configuration.getManagementClusterPassword();
+
+ ManagementServiceImpl.checkDefaultManagementClusterCredentials(managementClusterUser, managementClusterPassword);
+
+ registry = new HashMap<String, Object>();
+ broadcaster = new NotificationBroadcasterSupport();
+ notificationsEnabled = true;
+ objectNameBuilder = ObjectNameBuilder.create(configuration.getJMXDomain());
+ }
+
+ // Public --------------------------------------------------------
+
+ // ManagementService implementation -------------------------
+
+ public ObjectNameBuilder getObjectNameBuilder()
+ {
+ return objectNameBuilder;
+ }
+
+ public MessageCounterManager getMessageCounterManager()
+ {
+ return messageCounterManager;
+ }
+
+ public void setStorageManager(final StorageManager storageManager)
+ {
+ this.storageManager = storageManager;
+ }
+
+ public HornetQServerControlImpl registerServer(final PostOffice postOffice,
+ final StorageManager storageManager,
+ final Configuration configuration,
+ final HierarchicalRepository<AddressSettings> addressSettingsRepository,
+ final HierarchicalRepository<Set<Role>> securityRepository,
+ final ResourceManager resourceManager,
+ final RemotingService remotingService,
+ final HornetQServer messagingServer,
+ final QueueFactory queueFactory,
+ final ScheduledExecutorService scheduledThreadPool,
+ final PagingManager pagingManager,
+ final boolean backup) throws Exception
+ {
+ this.postOffice = postOffice;
+ this.addressSettingsRepository = addressSettingsRepository;
+ this.securityRepository = securityRepository;
+ this.storageManager = storageManager;
+ this.messagingServer = messagingServer;
+ this.pagingManager = pagingManager;
+
+ messageCounterManager = new MessageCounterManagerImpl(scheduledThreadPool);
+ messageCounterManager.setMaxDayCount(configuration.getMessageCounterMaxDayHistory());
+ messageCounterManager.reschedule(configuration.getMessageCounterSamplePeriod());
+
+ messagingServerControl = new HornetQServerControlImpl(postOffice,
+ configuration,
+ resourceManager,
+ remotingService,
+ messagingServer,
+ messageCounterManager,
+ storageManager,
+ broadcaster);
+ ObjectName objectName = objectNameBuilder.getHornetQServerObjectName();
+ registerInJMX(objectName, messagingServerControl);
+ registerInRegistry(ResourceNames.CORE_SERVER, messagingServerControl);
+
+ return messagingServerControl;
+ }
+
+ public synchronized void unregisterServer() throws Exception
+ {
+ ObjectName objectName = objectNameBuilder.getHornetQServerObjectName();
+ unregisterFromJMX(objectName);
+ unregisterFromRegistry(ResourceNames.CORE_SERVER);
+ }
+
+ public synchronized void registerAddress(final SimpleString address) throws Exception
+ {
+ ObjectName objectName = objectNameBuilder.getAddressObjectName(address);
+ AddressControlImpl addressControl = new AddressControlImpl(address,
+ postOffice,
+ pagingManager,
+ storageManager,
+ securityRepository);
+
+ registerInJMX(objectName, addressControl);
+
+ registerInRegistry(ResourceNames.CORE_ADDRESS + address, addressControl);
+
+ if (ManagementServiceImpl.log.isDebugEnabled())
+ {
+ ManagementServiceImpl.log.debug("registered address " + objectName);
+ }
+ }
+
+ public synchronized void unregisterAddress(final SimpleString address) throws Exception
+ {
+ ObjectName objectName = objectNameBuilder.getAddressObjectName(address);
+
+ unregisterFromJMX(objectName);
+ unregisterFromRegistry(ResourceNames.CORE_ADDRESS + address);
+ }
+
+ public synchronized void registerQueue(final Queue queue,
+ final SimpleString address,
+ final StorageManager storageManager) throws Exception
+ {
+ QueueControlImpl queueControl = new QueueControlImpl(queue,
+ address.toString(),
+ postOffice,
+ storageManager,
+ addressSettingsRepository);
+ if (messageCounterManager != null)
+ {
+ MessageCounter counter = new MessageCounter(queue.getName().toString(),
+ null,
+ queueControl,
+ false,
+ queue.isDurable(),
+ messageCounterManager.getMaxDayCount());
+ queueControl.setMessageCounter(counter);
+ messageCounterManager.registerMessageCounter(queue.getName().toString(), counter);
+ }
+ ObjectName objectName = objectNameBuilder.getQueueObjectName(address, queue.getName());
+ registerInJMX(objectName, queueControl);
+ registerInRegistry(ResourceNames.CORE_QUEUE + queue.getName(), queueControl);
+
+ if (ManagementServiceImpl.log.isDebugEnabled())
+ {
+ ManagementServiceImpl.log.debug("registered queue " + objectName);
+ }
+ }
+
+ public synchronized void unregisterQueue(final SimpleString name, final SimpleString address) throws Exception
+ {
+ ObjectName objectName = objectNameBuilder.getQueueObjectName(address, name);
+ unregisterFromJMX(objectName);
+ unregisterFromRegistry(ResourceNames.CORE_QUEUE + name);
+ messageCounterManager.unregisterMessageCounter(name.toString());
+ }
+
+ public synchronized void registerDivert(final Divert divert, final DivertConfiguration config) throws Exception
+ {
+ ObjectName objectName = objectNameBuilder.getDivertObjectName(divert.getUniqueName());
+ DivertControl divertControl = new DivertControlImpl(divert, storageManager, config);
+ registerInJMX(objectName, new StandardMBean(divertControl, DivertControl.class));
+ registerInRegistry(ResourceNames.CORE_DIVERT + config.getName(), divertControl);
+
+ if (ManagementServiceImpl.log.isDebugEnabled())
+ {
+ ManagementServiceImpl.log.debug("registered divert " + objectName);
+ }
+ }
+
+ public synchronized void unregisterDivert(final SimpleString name) throws Exception
+ {
+ ObjectName objectName = objectNameBuilder.getDivertObjectName(name);
+ unregisterFromJMX(objectName);
+ unregisterFromRegistry(ResourceNames.CORE_DIVERT + name);
+ }
+
+ public synchronized void registerAcceptor(final Acceptor acceptor, final TransportConfiguration configuration) throws Exception
+ {
+ ObjectName objectName = objectNameBuilder.getAcceptorObjectName(configuration.getName());
+ AcceptorControl control = new AcceptorControlImpl(acceptor, storageManager, configuration);
+ registerInJMX(objectName, new StandardMBean(control, AcceptorControl.class));
+ registerInRegistry(ResourceNames.CORE_ACCEPTOR + configuration.getName(), control);
+ }
+
+ public void unregisterAcceptors()
+ {
+ List<String> acceptors = new ArrayList<String>();
+ for (String resourceName : registry.keySet())
+ {
+ if (resourceName.startsWith(ResourceNames.CORE_ACCEPTOR))
+ {
+ acceptors.add(resourceName);
+ }
+ }
+
+ for (String acceptor : acceptors)
+ {
+ String name = acceptor.substring(ResourceNames.CORE_ACCEPTOR.length());
+ try
+ {
+ unregisterAcceptor(name);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ public synchronized void unregisterAcceptor(final String name) throws Exception
+ {
+ ObjectName objectName = objectNameBuilder.getAcceptorObjectName(name);
+ unregisterFromJMX(objectName);
+ unregisterFromRegistry(ResourceNames.CORE_ACCEPTOR + name);
+ }
+
+ public synchronized void registerBroadcastGroup(final BroadcastGroup broadcastGroup,
+ final BroadcastGroupConfiguration configuration) throws Exception
+ {
+ broadcastGroup.setNotificationService(this);
+ ObjectName objectName = objectNameBuilder.getBroadcastGroupObjectName(configuration.getName());
+ BroadcastGroupControl control = new BroadcastGroupControlImpl(broadcastGroup, storageManager, configuration);
+ registerInJMX(objectName, new StandardMBean(control, BroadcastGroupControl.class));
+ registerInRegistry(ResourceNames.CORE_BROADCAST_GROUP + configuration.getName(), control);
+ }
+
+ public synchronized void unregisterBroadcastGroup(final String name) throws Exception
+ {
+ ObjectName objectName = objectNameBuilder.getBroadcastGroupObjectName(name);
+ unregisterFromJMX(objectName);
+ unregisterFromRegistry(ResourceNames.CORE_BROADCAST_GROUP + name);
+ }
+
+ public synchronized void registerDiscoveryGroup(final DiscoveryGroup discoveryGroup,
+ final DiscoveryGroupConfiguration configuration) throws Exception
+ {
+ discoveryGroup.setNotificationService(this);
+ ObjectName objectName = objectNameBuilder.getDiscoveryGroupObjectName(configuration.getName());
+ DiscoveryGroupControl control = new DiscoveryGroupControlImpl(discoveryGroup, storageManager, configuration);
+ registerInJMX(objectName, new StandardMBean(control, DiscoveryGroupControl.class));
+ registerInRegistry(ResourceNames.CORE_DISCOVERY_GROUP + configuration.getName(), control);
+ }
+
+ public synchronized void unregisterDiscoveryGroup(final String name) throws Exception
+ {
+ ObjectName objectName = objectNameBuilder.getDiscoveryGroupObjectName(name);
+ unregisterFromJMX(objectName);
+ unregisterFromRegistry(ResourceNames.CORE_DISCOVERY_GROUP + name);
+ }
+
+ public synchronized void registerBridge(final Bridge bridge, final BridgeConfiguration configuration) throws Exception
+ {
+ bridge.setNotificationService(this);
+ ObjectName objectName = objectNameBuilder.getBridgeObjectName(configuration.getName());
+ BridgeControl control = new BridgeControlImpl(bridge, storageManager, configuration);
+ registerInJMX(objectName, new StandardMBean(control, BridgeControl.class));
+ registerInRegistry(ResourceNames.CORE_BRIDGE + configuration.getName(), control);
+ }
+
+ public synchronized void unregisterBridge(final String name) throws Exception
+ {
+ ObjectName objectName = objectNameBuilder.getBridgeObjectName(name);
+ unregisterFromJMX(objectName);
+ unregisterFromRegistry(ResourceNames.CORE_BRIDGE + name);
+ }
+
+ public synchronized void registerCluster(final ClusterConnection cluster,
+ final ClusterConnectionConfiguration configuration) throws Exception
+ {
+ ObjectName objectName = objectNameBuilder.getClusterConnectionObjectName(configuration.getName());
+ ClusterConnectionControl control = new ClusterConnectionControlImpl(cluster, storageManager, configuration);
+ registerInJMX(objectName, new StandardMBean(control, ClusterConnectionControl.class));
+ registerInRegistry(ResourceNames.CORE_CLUSTER_CONNECTION + configuration.getName(), control);
+ }
+
+ public synchronized void unregisterCluster(final String name) throws Exception
+ {
+ ObjectName objectName = objectNameBuilder.getClusterConnectionObjectName(name);
+ unregisterFromJMX(objectName);
+ unregisterFromRegistry(ResourceNames.CORE_CLUSTER_CONNECTION + name);
+ }
+
+ public ServerMessage handleMessage(final ServerMessage message) throws Exception
+ {
+ // a reply message is sent with the result stored in the message body.
+ ServerMessage reply = new ServerMessageImpl(storageManager.generateUniqueID(), 512);
+
+ String resourceName = message.getStringProperty(ManagementHelper.HDR_RESOURCE_NAME);
+ if (ManagementServiceImpl.log.isDebugEnabled())
+ {
+ ManagementServiceImpl.log.debug("handling management message for " + resourceName);
+ }
+
+ String operation = message.getStringProperty(ManagementHelper.HDR_OPERATION_NAME);
+
+ if (operation != null)
+ {
+ Object[] params = ManagementHelper.retrieveOperationParameters(message);
+
+ if (params == null)
+ {
+ params = new Object[0];
+ }
+
+ try
+ {
+ Object result = invokeOperation(resourceName, operation, params);
+
+ ManagementHelper.storeResult(reply, result);
+
+ reply.putBooleanProperty(ManagementHelper.HDR_OPERATION_SUCCEEDED, true);
+ }
+ catch (Exception e)
+ {
+ ManagementServiceImpl.log.warn("exception while invoking " + operation + " on " + resourceName, e);
+ reply.putBooleanProperty(ManagementHelper.HDR_OPERATION_SUCCEEDED, false);
+ String exceptionMessage = e.getMessage();
+ if (e instanceof InvocationTargetException)
+ {
+ exceptionMessage = ((InvocationTargetException)e).getTargetException().getMessage();
+ }
+ if (e != null)
+ {
+ ManagementHelper.storeResult(reply, exceptionMessage);
+ }
+ }
+ }
+ else
+ {
+ String attribute = message.getStringProperty(ManagementHelper.HDR_ATTRIBUTE);
+
+ if (attribute != null)
+ {
+ try
+ {
+ Object result = getAttribute(resourceName, attribute);
+
+ ManagementHelper.storeResult(reply, result);
+ }
+ catch (Exception e)
+ {
+ ManagementServiceImpl.log.warn("exception while retrieving attribute " + attribute +
+ " on " +
+ resourceName, e);
+ reply.putBooleanProperty(ManagementHelper.HDR_OPERATION_SUCCEEDED, false);
+ String exceptionMessage = e.getMessage();
+ if (e instanceof InvocationTargetException)
+ {
+ exceptionMessage = ((InvocationTargetException)e).getTargetException().getMessage();
+ }
+ if (e != null)
+ {
+ ManagementHelper.storeResult(reply, exceptionMessage);
+ }
+ }
+ }
+ }
+
+ return reply;
+ }
+
+ public Object getResource(final String resourceName)
+ {
+ return registry.get(resourceName);
+ }
+
+ public Object[] getResources(final Class<?> resourceType)
+ {
+ List<Object> resources = new ArrayList<Object>();
+ for (Object entry : registry.values())
+ {
+ if (resourceType.isAssignableFrom(entry.getClass()))
+ {
+ resources.add(entry);
+ }
+ }
+ return resources.toArray(new Object[resources.size()]);
+ }
+
+ private final Set<ObjectName> registeredNames = new HashSet<ObjectName>();
+
+ public void registerInJMX(final ObjectName objectName, final Object managedResource) throws Exception
+ {
+ if (!jmxManagementEnabled)
+ {
+ return;
+ }
+
+ synchronized (mbeanServer)
+ {
+ unregisterFromJMX(objectName);
+
+ mbeanServer.registerMBean(managedResource, objectName);
+
+ registeredNames.add(objectName);
+ }
+ }
+
+ public synchronized void registerInRegistry(final String resourceName, final Object managedResource)
+ {
+ unregisterFromRegistry(resourceName);
+
+ registry.put(resourceName, managedResource);
+ }
+
+ public void unregisterFromRegistry(final String resourceName)
+ {
+ registry.remove(resourceName);
+ }
+
+ // the JMX unregistration is synchronized to avoid race conditions if 2 clients tries to
+ // unregister the same resource (e.g. a queue) at the same time since unregisterMBean()
+ // will throw an exception if the MBean has already been unregistered
+ public void unregisterFromJMX(final ObjectName objectName) throws Exception
+ {
+ if (!jmxManagementEnabled)
+ {
+ return;
+ }
+
+ synchronized (mbeanServer)
+ {
+ if (mbeanServer.isRegistered(objectName))
+ {
+ mbeanServer.unregisterMBean(objectName);
+
+ registeredNames.remove(objectName);
+ }
+ }
+ }
+
+ public void addNotificationListener(final NotificationListener listener)
+ {
+ listeners.add(listener);
+ }
+
+ public void removeNotificationListener(final NotificationListener listener)
+ {
+ listeners.remove(listener);
+ }
+
+ public SimpleString getManagementAddress()
+ {
+ return managementAddress;
+ }
+
+ public SimpleString getManagementNotificationAddress()
+ {
+ return managementNotificationAddress;
+ }
+
+ public String getClusterUser()
+ {
+ return managementClusterUser;
+ }
+
+ public String getClusterPassword()
+ {
+ return managementClusterPassword;
+ }
+
+ // HornetQComponent implementation -----------------------------
+
+ public void start() throws Exception
+ {
+ if (messageCounterEnabled)
+ {
+ messageCounterManager.start();
+ }
+
+ started = true;
+ }
+
+ public synchronized void stop() throws Exception
+ {
+ Set<String> resourceNames = new HashSet<String>(registry.keySet());
+
+ for (String resourceName : resourceNames)
+ {
+ unregisterFromRegistry(resourceName);
+ }
+
+ if (jmxManagementEnabled)
+ {
+ if (!registeredNames.isEmpty())
+ {
+ List<String> unexpectedResourceNames = new ArrayList<String>();
+ for (String name : resourceNames)
+ {
+ // only addresses and queues should still be registered
+ if (!(name.startsWith(ResourceNames.CORE_ADDRESS) || name.startsWith(ResourceNames.CORE_QUEUE)))
+ {
+ unexpectedResourceNames.add(name);
+ }
+ }
+ if (!unexpectedResourceNames.isEmpty())
+ {
+ ManagementServiceImpl.log.warn("On ManagementService stop, there are " + unexpectedResourceNames.size() +
+ " unexpected registered MBeans: " +
+ unexpectedResourceNames);
+ }
+
+ for (ObjectName on : registeredNames)
+ {
+ try
+ {
+ mbeanServer.unregisterMBean(on);
+ }
+ catch (Exception ignore)
+ {
+ }
+ }
+ }
+ }
+
+ if (messageCounterManager != null)
+ {
+ messageCounterManager.stop();
+
+ messageCounterManager.resetAllCounters();
+
+ messageCounterManager.resetAllCounterHistories();
+
+ messageCounterManager.clear();
+ }
+
+ registeredNames.clear();
+
+ started = false;
+ }
+
+ public boolean isStarted()
+ {
+ return started;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ public void sendNotification(final Notification notification) throws Exception
+ {
+ if (messagingServerControl != null && notificationsEnabled)
+ {
+ // This needs to be synchronized since we need to ensure notifications are processed in strict sequence
+ synchronized (this)
+ {
+ // We also need to synchronize on the post office notification lock
+ // otherwise we can get notifications arriving in wrong order / missing
+ // if a notification occurs at same time as sendQueueInfoToQueue is processed
+ synchronized (postOffice.getNotificationLock())
+ {
+
+ // First send to any local listeners
+ for (NotificationListener listener : listeners)
+ {
+ try
+ {
+ listener.onNotification(notification);
+ }
+ catch (Exception e)
+ {
+ // Exception thrown from one listener should not stop execution of others
+ ManagementServiceImpl.log.error("Failed to call listener", e);
+ }
+ }
+
+ // start sending notification *messages* only when the *remoting service* if started
+ if (messagingServer == null || !messagingServer.getRemotingService().isStarted())
+ {
+ return;
+ }
+
+ long messageID = storageManager.generateUniqueID();
+
+ ServerMessage notificationMessage = new ServerMessageImpl(messageID, 512);
+
+ // Notification messages are always durable so the user can choose whether to add a durable queue to
+ // consume
+ // them in
+ notificationMessage.setDurable(true);
+ notificationMessage.setAddress(managementNotificationAddress);
+
+ TypedProperties notifProps;
+ if (notification.getProperties() != null)
+ {
+ notifProps = new TypedProperties(notification.getProperties());
+ }
+ else
+ {
+ notifProps = new TypedProperties();
+ }
+
+ notifProps.putSimpleStringProperty(ManagementHelper.HDR_NOTIFICATION_TYPE,
+ new SimpleString(notification.getType().toString()));
+
+ notifProps.putLongProperty(ManagementHelper.HDR_NOTIFICATION_TIMESTAMP, System.currentTimeMillis());
+
+ if (notification.getUID() != null)
+ {
+ notifProps.putSimpleStringProperty(new SimpleString("foobar"),
+ new SimpleString(notification.getUID()));
+ }
+
+ notificationMessage.putTypedProperties(notifProps);
+
+ postOffice.route(notificationMessage);
+ }
+ }
+ }
+ }
+
+ public void enableNotifications(final boolean enabled)
+ {
+ notificationsEnabled = enabled;
+ }
+
+ public Object getAttribute(final String resourceName, final String attribute)
+ {
+ try
+ {
+ Object resource = registry.get(resourceName);
+
+ if (resource == null)
+ {
+ throw new IllegalArgumentException("Cannot find resource with name " + resourceName);
+ }
+
+ Method method = null;
+
+ String upperCaseAttribute = attribute.substring(0, 1).toUpperCase() + attribute.substring(1);
+ try
+ {
+ method = resource.getClass().getMethod("get" + upperCaseAttribute, new Class[0]);
+ }
+ catch (NoSuchMethodException nsme)
+ {
+ try
+ {
+ method = resource.getClass().getMethod("is" + upperCaseAttribute, new Class[0]);
+ }
+ catch (NoSuchMethodException nsme2)
+ {
+ throw new IllegalArgumentException("no getter method for " + attribute);
+ }
+ }
+ return method.invoke(resource, new Object[0]);
+ }
+ catch (Throwable t)
+ {
+ throw new IllegalStateException("Problem while retrieving attribute " + attribute, t);
+ }
+ }
+
+ private Object invokeOperation(final String resourceName, final String operation, final Object[] params) throws Exception
+ {
+ Object resource = registry.get(resourceName);
+
+ if (resource == null)
+ {
+ throw new IllegalArgumentException("Cannot find resource with name " + resourceName);
+ }
+
+ Method method = null;
+
+ Method[] methods = resource.getClass().getMethods();
+ for (Method m : methods)
+ {
+ if (m.getName().equals(operation) && m.getParameterTypes().length == params.length)
+ {
+ boolean match = true;
+
+ Class<?>[] paramTypes = m.getParameterTypes();
+
+ for (int i = 0; i < paramTypes.length; i++)
+ {
+ if (params[i] == null)
+ {
+ continue;
+ }
+ if (paramTypes[i].isAssignableFrom(params[i].getClass()) || paramTypes[i] == Long.TYPE &&
+ params[i].getClass() == Integer.class ||
+ paramTypes[i] == Double.TYPE &&
+ params[i].getClass() == Integer.class ||
+ paramTypes[i] == Long.TYPE &&
+ params[i].getClass() == Long.class ||
+ paramTypes[i] == Double.TYPE &&
+ params[i].getClass() == Double.class ||
+ paramTypes[i] == Integer.TYPE &&
+ params[i].getClass() == Integer.class ||
+ paramTypes[i] == Boolean.TYPE &&
+ params[i].getClass() == Boolean.class)
+ {
+ // parameter match
+ }
+ else
+ {
+ match = false;
+ break; // parameter check loop
+ }
+ }
+
+ if (match)
+ {
+ method = m;
+ break; // method match loop
+ }
+ }
+ }
+
+ if (method == null)
+ {
+ throw new IllegalArgumentException("no operation " + operation + "/" + params.length);
+ }
+
+ Object result = method.invoke(resource, params);
+
+ return result;
+ }
+
+ // Inner classes -------------------------------------------------
+}
Modified: trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/integration/transports/netty/NettyAcceptor.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -29,14 +29,14 @@
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.management.Notification;
-import org.hornetq.core.management.NotificationService;
import org.hornetq.core.management.NotificationType;
import org.hornetq.core.remoting.impl.ssl.SSLSupport;
import org.hornetq.core.remoting.spi.Acceptor;
import org.hornetq.core.remoting.spi.BufferHandler;
import org.hornetq.core.remoting.spi.Connection;
import org.hornetq.core.remoting.spi.ConnectionLifeCycleListener;
+import org.hornetq.core.server.management.Notification;
+import org.hornetq.core.server.management.NotificationService;
import org.hornetq.utils.ConfigurationHelper;
import org.hornetq.utils.SimpleString;
import org.hornetq.utils.TypedProperties;
Modified: trunk/src/main/org/hornetq/jms/server/management/DestinationControl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/management/DestinationControl.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/jms/server/management/DestinationControl.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -15,8 +15,8 @@
import javax.management.MBeanOperationInfo;
-import org.hornetq.core.management.Operation;
-import org.hornetq.core.management.Parameter;
+import org.hornetq.core.server.management.Operation;
+import org.hornetq.core.server.management.Parameter;
/**
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
Modified: trunk/src/main/org/hornetq/jms/server/management/JMSQueueControl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/management/JMSQueueControl.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/jms/server/management/JMSQueueControl.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -17,8 +17,8 @@
import javax.management.MBeanOperationInfo;
-import org.hornetq.core.management.Operation;
-import org.hornetq.core.management.Parameter;
+import org.hornetq.core.server.management.Operation;
+import org.hornetq.core.server.management.Parameter;
/**
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
Modified: trunk/src/main/org/hornetq/jms/server/management/JMSServerControl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/management/JMSServerControl.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/jms/server/management/JMSServerControl.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -17,8 +17,8 @@
import javax.management.MBeanOperationInfo;
-import org.hornetq.core.management.Operation;
-import org.hornetq.core.management.Parameter;
+import org.hornetq.core.server.management.Operation;
+import org.hornetq.core.server.management.Parameter;
/**
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
Modified: trunk/src/main/org/hornetq/jms/server/management/TopicControl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/management/TopicControl.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/jms/server/management/TopicControl.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -17,8 +17,8 @@
import javax.management.MBeanOperationInfo;
-import org.hornetq.core.management.Operation;
-import org.hornetq.core.management.Parameter;
+import org.hornetq.core.server.management.Operation;
+import org.hornetq.core.server.management.Parameter;
/**
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
Modified: trunk/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -18,11 +18,11 @@
import javax.management.ObjectName;
import org.hornetq.core.management.AddressControl;
-import org.hornetq.core.management.ManagementService;
import org.hornetq.core.management.QueueControl;
import org.hornetq.core.management.ResourceNames;
import org.hornetq.core.messagecounter.MessageCounter;
import org.hornetq.core.messagecounter.MessageCounterManager;
+import org.hornetq.core.server.management.ManagementService;
import org.hornetq.jms.HornetQQueue;
import org.hornetq.jms.HornetQTopic;
import org.hornetq.jms.client.HornetQConnectionFactory;
Modified: trunk/src/main/org/hornetq/jms/server/management/impl/JMSTopicControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/management/impl/JMSTopicControlImpl.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/src/main/org/hornetq/jms/server/management/impl/JMSTopicControlImpl.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -24,9 +24,9 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.management.AddressControl;
import org.hornetq.core.management.HornetQServerControl;
-import org.hornetq.core.management.ManagementService;
import org.hornetq.core.management.QueueControl;
import org.hornetq.core.management.ResourceNames;
+import org.hornetq.core.server.management.ManagementService;
import org.hornetq.jms.HornetQTopic;
import org.hornetq.jms.client.HornetQMessage;
import org.hornetq.jms.client.SelectorTranslator;
Modified: trunk/tests/config/ConfigurationTest-full-config.xml
===================================================================
--- trunk/tests/config/ConfigurationTest-full-config.xml 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/tests/config/ConfigurationTest-full-config.xml 2009-12-10 15:25:39 UTC (rev 8664)
@@ -12,7 +12,6 @@
<wild-card-routing-enabled>true</wild-card-routing-enabled>
<management-address>Giraffe</management-address>
<management-notification-address>Whatever</management-notification-address>
- <management-request-timeout>91</management-request-timeout>
<management-cluster-user>Frog</management-cluster-user>
<management-cluster-password>Wombat</management-cluster-password>
<jmx-management-enabled>false</jmx-management-enabled>
Modified: trunk/tests/src/org/hornetq/tests/integration/SimpleNotificationService.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/SimpleNotificationService.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/tests/src/org/hornetq/tests/integration/SimpleNotificationService.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -16,9 +16,9 @@
import java.util.ArrayList;
import java.util.List;
-import org.hornetq.core.management.Notification;
-import org.hornetq.core.management.NotificationListener;
-import org.hornetq.core.management.NotificationService;
+import org.hornetq.core.server.management.Notification;
+import org.hornetq.core.server.management.NotificationListener;
+import org.hornetq.core.server.management.NotificationService;
/**
* A SimpleNotificationService
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusteredGroupingTest.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -18,8 +18,6 @@
import junit.framework.Assert;
import org.hornetq.core.client.management.impl.ManagementHelper;
-import org.hornetq.core.management.Notification;
-import org.hornetq.core.management.NotificationListener;
import org.hornetq.core.management.NotificationType;
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.server.group.GroupingHandler;
@@ -27,6 +25,8 @@
import org.hornetq.core.server.group.impl.GroupingHandlerConfiguration;
import org.hornetq.core.server.group.impl.Proposal;
import org.hornetq.core.server.group.impl.Response;
+import org.hornetq.core.server.management.Notification;
+import org.hornetq.core.server.management.NotificationListener;
import org.hornetq.utils.SimpleString;
/**
Modified: trunk/tests/src/org/hornetq/tests/integration/discovery/DiscoveryTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/discovery/DiscoveryTest.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/tests/src/org/hornetq/tests/integration/discovery/DiscoveryTest.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -25,10 +25,10 @@
import org.hornetq.core.cluster.impl.DiscoveryGroupImpl;
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.management.Notification;
import org.hornetq.core.management.NotificationType;
import org.hornetq.core.server.cluster.BroadcastGroup;
import org.hornetq.core.server.cluster.impl.BroadcastGroupImpl;
+import org.hornetq.core.server.management.Notification;
import org.hornetq.tests.integration.SimpleNotificationService;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.UnitTestCase;
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/bridge/BridgeTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/bridge/BridgeTestBase.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/bridge/BridgeTestBase.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -38,12 +38,12 @@
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.management.ManagementService;
import org.hornetq.core.management.ResourceNames;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.remoting.impl.invm.TransportConstants;
import org.hornetq.core.server.HornetQ;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.management.ManagementService;
import org.hornetq.jms.bridge.ConnectionFactoryFactory;
import org.hornetq.jms.bridge.DestinationFactory;
import org.hornetq.jms.bridge.QualityOfServiceMode;
Modified: trunk/tests/src/org/hornetq/tests/integration/management/AcceptorControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/AcceptorControlTest.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/tests/src/org/hornetq/tests/integration/management/AcceptorControlTest.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -25,12 +25,12 @@
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.management.AcceptorControl;
-import org.hornetq.core.management.Notification;
import org.hornetq.core.management.NotificationType;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.server.HornetQ;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.management.Notification;
import org.hornetq.tests.integration.SimpleNotificationService;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.utils.SimpleString;
Modified: trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlTest.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/tests/src/org/hornetq/tests/integration/management/BridgeControlTest.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -28,7 +28,6 @@
import org.hornetq.core.config.cluster.QueueConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.management.BridgeControl;
-import org.hornetq.core.management.Notification;
import org.hornetq.core.management.NotificationType;
import org.hornetq.core.management.ObjectNameBuilder;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
@@ -36,6 +35,7 @@
import org.hornetq.core.remoting.impl.invm.TransportConstants;
import org.hornetq.core.server.HornetQ;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.management.Notification;
import org.hornetq.tests.integration.SimpleNotificationService;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.utils.Pair;
Modified: trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControlTest.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -29,7 +29,6 @@
import org.hornetq.core.config.cluster.QueueConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.management.ClusterConnectionControl;
-import org.hornetq.core.management.Notification;
import org.hornetq.core.management.NotificationType;
import org.hornetq.core.management.ObjectNameBuilder;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
@@ -37,6 +36,7 @@
import org.hornetq.core.remoting.impl.invm.TransportConstants;
import org.hornetq.core.server.HornetQ;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.management.Notification;
import org.hornetq.tests.integration.SimpleNotificationService;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.utils.Pair;
Modified: trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -97,7 +97,6 @@
Assert.assertEquals(conf.getManagementAddress().toString(), serverControl.getManagementAddress());
Assert.assertEquals(conf.getManagementNotificationAddress().toString(),
serverControl.getManagementNotificationAddress());
- Assert.assertEquals(conf.getManagementRequestTimeout(), serverControl.getManagementRequestTimeout());
Assert.assertEquals(conf.getIDCacheSize(), serverControl.getIDCacheSize());
Assert.assertEquals(conf.isPersistIDCache(), serverControl.isPersistIDCache());
Assert.assertEquals(conf.getBindingsDirectory(), serverControl.getBindingsDirectory());
Modified: trunk/tests/src/org/hornetq/tests/integration/management/ManagementServiceImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/ManagementServiceImplTest.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/tests/src/org/hornetq/tests/integration/management/ManagementServiceImplTest.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -22,13 +22,13 @@
import org.hornetq.core.management.AddressControl;
import org.hornetq.core.management.QueueControl;
import org.hornetq.core.management.ResourceNames;
-import org.hornetq.core.management.impl.ManagementServiceImpl;
import org.hornetq.core.persistence.impl.nullpm.NullStorageManager;
import org.hornetq.core.server.HornetQ;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.ServerMessageImpl;
+import org.hornetq.core.server.management.impl.ManagementServiceImpl;
import org.hornetq.tests.integration.server.FakeStorageManager;
import org.hornetq.tests.unit.core.postoffice.impl.FakeQueue;
import org.hornetq.tests.util.RandomUtil;
Modified: trunk/tests/src/org/hornetq/tests/unit/core/config/impl/ConfigurationImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/config/impl/ConfigurationImplTest.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/tests/src/org/hornetq/tests/unit/core/config/impl/ConfigurationImplTest.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -97,7 +97,6 @@
Assert.assertEquals(ConfigurationImpl.DEFAULT_MESSAGE_COUNTER_MAX_DAY_HISTORY,
conf.getMessageCounterMaxDayHistory());
Assert.assertEquals(ConfigurationImpl.DEFAULT_MESSAGE_COUNTER_SAMPLE_PERIOD, conf.getMessageCounterSamplePeriod());
- Assert.assertEquals(ConfigurationImpl.DEFAULT_MANAGEMENT_REQUEST_TIMEOUT, conf.getManagementRequestTimeout());
Assert.assertEquals(ConfigurationImpl.DEFAULT_ID_CACHE_SIZE, conf.getIDCacheSize());
Assert.assertEquals(ConfigurationImpl.DEFAULT_PERSIST_ID_CACHE, conf.isPersistIDCache());
Assert.assertEquals(ConfigurationImpl.DEFAULT_SERVER_DUMP_INTERVAL, conf.getServerDumpInterval());
@@ -234,10 +233,6 @@
conf.setManagementClusterUser(s);
Assert.assertEquals(s, conf.getManagementClusterUser());
- l = RandomUtil.randomLong();
- conf.setManagementRequestTimeout(l);
- Assert.assertEquals(l, conf.getManagementRequestTimeout());
-
i = RandomUtil.randomInt();
conf.setIDCacheSize(i);
Assert.assertEquals(i, conf.getIDCacheSize());
@@ -455,10 +450,6 @@
conf.setManagementClusterUser(s);
Assert.assertEquals(s, conf.getManagementClusterUser());
- l = RandomUtil.randomLong();
- conf.setManagementRequestTimeout(l);
- Assert.assertEquals(l, conf.getManagementRequestTimeout());
-
i = RandomUtil.randomInt();
conf.setIDCacheSize(i);
Assert.assertEquals(i, conf.getIDCacheSize());
Modified: trunk/tests/src/org/hornetq/tests/unit/core/config/impl/DefaultsFileConfigurationTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/config/impl/DefaultsFileConfigurationTest.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/tests/src/org/hornetq/tests/unit/core/config/impl/DefaultsFileConfigurationTest.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -86,8 +86,6 @@
Assert.assertEquals(ConfigurationImpl.DEFAULT_MANAGEMENT_CLUSTER_PASSWORD, conf.getManagementClusterPassword());
- Assert.assertEquals(ConfigurationImpl.DEFAULT_MANAGEMENT_REQUEST_TIMEOUT, conf.getManagementRequestTimeout());
-
Assert.assertEquals(ConfigurationImpl.DEFAULT_ID_CACHE_SIZE, conf.getIDCacheSize());
Assert.assertEquals(ConfigurationImpl.DEFAULT_PERSIST_ID_CACHE, conf.isPersistIDCache());
Modified: trunk/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java 2009-12-10 11:53:59 UTC (rev 8663)
+++ trunk/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java 2009-12-10 15:25:39 UTC (rev 8664)
@@ -46,7 +46,6 @@
Assert.assertEquals(5423, conf.getSecurityInvalidationInterval());
Assert.assertEquals(true, conf.isWildcardRoutingEnabled());
Assert.assertEquals(new SimpleString("Giraffe"), conf.getManagementAddress());
- Assert.assertEquals(91, conf.getManagementRequestTimeout());
Assert.assertEquals(new SimpleString("Whatever"), conf.getManagementNotificationAddress());
Assert.assertEquals("Frog", conf.getManagementClusterUser());
Assert.assertEquals("Wombat", conf.getManagementClusterPassword());
15 years, 3 months
JBoss hornetq SVN: r8663 - in trunk: src/main/org/hornetq/core/message and 7 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2009-12-10 06:53:59 -0500 (Thu, 10 Dec 2009)
New Revision: 8663
Modified:
trunk/examples/core/embedded/src/org/hornetq/core/example/EmbeddedExample.java
trunk/src/main/org/hornetq/core/message/Message.java
trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
trunk/src/main/org/hornetq/core/remoting/spi/Acceptor.java
trunk/src/main/org/hornetq/core/remoting/spi/AcceptorFactory.java
trunk/src/main/org/hornetq/core/remoting/spi/BufferHandler.java
trunk/src/main/org/hornetq/core/remoting/spi/Connection.java
trunk/src/main/org/hornetq/core/remoting/spi/ConnectionLifeCycleListener.java
trunk/src/main/org/hornetq/core/remoting/spi/Connector.java
trunk/src/main/org/hornetq/core/remoting/spi/ConnectorFactory.java
trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
trunk/src/main/org/hornetq/integration/transports/netty/NettyConnection.java
trunk/tests/src/org/hornetq/tests/integration/client/JournalCrashTest.java
Log:
javadocs and some refactoring of message
Modified: trunk/examples/core/embedded/src/org/hornetq/core/example/EmbeddedExample.java
===================================================================
--- trunk/examples/core/embedded/src/org/hornetq/core/example/EmbeddedExample.java 2009-12-10 11:44:51 UTC (rev 8662)
+++ trunk/examples/core/embedded/src/org/hornetq/core/example/EmbeddedExample.java 2009-12-10 11:53:59 UTC (rev 8663)
@@ -13,6 +13,7 @@
package org.hornetq.core.example;
import java.util.Date;
+import java.util.HashMap;
import org.hornetq.core.client.ClientConsumer;
import org.hornetq.core.client.ClientMessage;
@@ -114,4 +115,12 @@
}
}
+ {
+ HashMap<String, Object> map = new HashMap<String, Object>();
+ map.put("host", "localhost");
+ map.put("port", 5445);
+ TransportConfiguration config = new TransportConfiguration(InVMConnectorFactory.class.getName(), map);
+ ClientSessionFactory sf = new ClientSessionFactoryImpl(config);
+ }
+
}
Modified: trunk/src/main/org/hornetq/core/message/Message.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/Message.java 2009-12-10 11:44:51 UTC (rev 8662)
+++ trunk/src/main/org/hornetq/core/message/Message.java 2009-12-10 11:53:59 UTC (rev 8663)
@@ -166,10 +166,8 @@
HornetQBuffer getBodyBuffer();
// Properties
- // ------------------------------------------------------------------
+ // -----------------------------------------------------------------
- TypedProperties getProperties();
-
/**
* Puts a boolean property in this message.
*
Modified: trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-12-10 11:44:51 UTC (rev 8662)
+++ trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-12-10 11:53:59 UTC (rev 8663)
@@ -792,11 +792,6 @@
return properties.getPropertyNames();
}
- public TypedProperties getProperties()
- {
- return properties;
- }
-
public HornetQBuffer getWholeBuffer()
{
return buffer;
@@ -815,6 +810,12 @@
// Private -------------------------------------------------------
+
+ private TypedProperties getProperties()
+ {
+ return properties;
+ }
+
// This must be synchronized as it can be called concurrently id the message is being delivered concurently to
// many queues - the first caller in this case will actually encode it
private synchronized HornetQBuffer encodeToBuffer()
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-12-10 11:44:51 UTC (rev 8662)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-12-10 11:53:59 UTC (rev 8663)
@@ -1255,9 +1255,9 @@
messageEncoding.decode(buff);
- if (largeMessage.getProperties().containsProperty(MessageImpl.HDR_ORIG_MESSAGE_ID))
+ if (largeMessage.containsProperty(MessageImpl.HDR_ORIG_MESSAGE_ID))
{
- long originalMessageID = largeMessage.getProperties().getLongProperty(MessageImpl.HDR_ORIG_MESSAGE_ID);
+ long originalMessageID = largeMessage.getLongProperty(MessageImpl.HDR_ORIG_MESSAGE_ID);
LargeServerMessage originalMessage = (LargeServerMessage)messages.get(originalMessageID);
Modified: trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java 2009-12-10 11:44:51 UTC (rev 8662)
+++ trunk/src/main/org/hornetq/core/remoting/impl/invm/InVMConnection.java 2009-12-10 11:53:59 UTC (rev 8663)
@@ -151,9 +151,4 @@
return "invm:" + serverID;
}
- public void fail(final HornetQException me)
- {
- listener.connectionException(id, me);
- }
-
}
Modified: trunk/src/main/org/hornetq/core/remoting/spi/Acceptor.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/spi/Acceptor.java 2009-12-10 11:44:51 UTC (rev 8662)
+++ trunk/src/main/org/hornetq/core/remoting/spi/Acceptor.java 2009-12-10 11:53:59 UTC (rev 8663)
@@ -25,7 +25,15 @@
*/
public interface Acceptor extends HornetQComponent
{
+ /**
+ * Pause the acceptor and stop it from receiving client requests.
+ */
void pause();
+ /**
+ * Set the notification service for this acceptor to use.
+ *
+ * @param notificationService the notification service
+ */
void setNotificationService(NotificationService notificationService);
}
Modified: trunk/src/main/org/hornetq/core/remoting/spi/AcceptorFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/spi/AcceptorFactory.java 2009-12-10 11:44:51 UTC (rev 8662)
+++ trunk/src/main/org/hornetq/core/remoting/spi/AcceptorFactory.java 2009-12-10 11:53:59 UTC (rev 8663)
@@ -19,17 +19,38 @@
import java.util.concurrent.ScheduledExecutorService;
/**
+ * A factory for creating acceptors.
+ * <p/>
+ * An Acceptor is an endpoin that a {@link org.hornetq.core.remoting.spi.Connector} will connect to and is used by the remoting service.
+ *
* @author <a href="ataylor(a)redhat.com">Andy Taylor</a>
* @author <a href="tim.fox(a)jboss.com">Tim Fox</a>
* @author <a href="jmesnil(a)redhat.com">Jeff Mesnil</a>
*/
public interface AcceptorFactory
{
+ /**
+ * Create a new instance of an Acceptor.
+ *
+ * @param configuration the configuration
+ * @param handler the handler
+ * @param listener the listener
+ * @param threadPool the threadpool
+ * @param scheduledThreadPool a scheduled thread pool
+ * @return an acceptor
+ */
Acceptor createAcceptor(final Map<String, Object> configuration,
BufferHandler handler,
ConnectionLifeCycleListener listener,
Executor threadPool,
ScheduledExecutorService scheduledThreadPool);
+ /**
+ * Returns the allowable properties for this acceptor.
+ * <p/>
+ * This will differ between different acceptor implementations.
+ *
+ * @return the allowable properties.
+ */
Set<String> getAllowableProperties();
}
Modified: trunk/src/main/org/hornetq/core/remoting/spi/BufferHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/spi/BufferHandler.java 2009-12-10 11:44:51 UTC (rev 8662)
+++ trunk/src/main/org/hornetq/core/remoting/spi/BufferHandler.java 2009-12-10 11:53:59 UTC (rev 8663)
@@ -15,14 +15,29 @@
import org.hornetq.core.buffers.HornetQBuffer;
/**
- * A BufferHandler
- *
+ * A BufferHandler that will handle buffers received by an acceptor.
+ * <p/>
+ * The Buffer Handler will decode the buffer and take the appropriate action, typically forwarding to the correct channel.
+ *
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
*/
public interface BufferHandler
{
+ /**
+ * called by the remoting connection when a buffer is received.
+ *
+ * @param connectionID the connection the buffer was received on
+ * @param buffer the buffer to decode
+ */
void bufferReceived(Object connectionID, HornetQBuffer buffer);
+ /**
+ * called by the remoting connection prior to {@link org.hornetq.core.remoting.spi.BufferHandler#bufferReceived(Object, org.hornetq.core.buffers.HornetQBuffer)}.
+ * <p/>
+ * The implementation should return true if there is enough data in the buffer to decode. otherwise false.
+ *
+ * @param buffer the buffer
+ * @return true id the buffer can be decoded..
+ */
int isReadyToHandle(HornetQBuffer buffer);
}
Modified: trunk/src/main/org/hornetq/core/remoting/spi/Connection.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/spi/Connection.java 2009-12-10 11:44:51 UTC (rev 8662)
+++ trunk/src/main/org/hornetq/core/remoting/spi/Connection.java 2009-12-10 11:53:59 UTC (rev 8663)
@@ -17,25 +17,52 @@
import org.hornetq.core.exception.HornetQException;
/**
+ * The connection used by a channel to write data to.
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
* @version <tt>$Revision$</tt>
- *
*/
public interface Connection
{
+ /**
+ * Create a new HornetQBuffer of the given size.
+ *
+ * @param size the size of buffer to create
+ * @return the new buffer.
+ */
HornetQBuffer createBuffer(int size);
+ /**
+ * returns the unique id of this wire.
+ *
+ * @return the id
+ */
Object getID();
+ /**
+ * writes the buffer to the wire.
+ *
+ * @param buffer the buffer to write
+ */
void write(HornetQBuffer buffer);
+ /**
+ * writes the buffer to the connection and if flush is true returns only when the buffer has been physically written to the connection.
+ *
+ * @param buffer the buffer to write
+ * @param flush whether to flush the buffers onto the wire
+ */
void write(HornetQBuffer buffer, boolean flush);
+ /**
+ * closes this connection.
+ */
void close();
+ /**
+ * returns a string representation of the remote address this connection is connected to.
+ *
+ * @return the remote address
+ */
String getRemoteAddress();
-
- void fail(HornetQException me);
}
\ No newline at end of file
Modified: trunk/src/main/org/hornetq/core/remoting/spi/ConnectionLifeCycleListener.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/spi/ConnectionLifeCycleListener.java 2009-12-10 11:44:51 UTC (rev 8662)
+++ trunk/src/main/org/hornetq/core/remoting/spi/ConnectionLifeCycleListener.java 2009-12-10 11:53:59 UTC (rev 8663)
@@ -15,17 +15,32 @@
import org.hornetq.core.exception.HornetQException;
/**
+ * A ConnectionLifeCycleListener is called by the remoting implementation to notify of connection events.
*
- * A ConnectionLifeCycleListener
- *
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
*/
public interface ConnectionLifeCycleListener
{
+ /**
+ * called when a connection is created.
+ *
+ * @param connection the connection that has been created
+ */
void connectionCreated(Connection connection);
+ /**
+ * called when a connection is destroyed.
+ *
+ * @param connectionID the connection being destroyed.
+ */
void connectionDestroyed(Object connectionID);
+
+ /**
+ * called when an error occurs on the connection.
+ *
+ * @param connectionID the id of the connection.
+ * @param me the exception.
+ */
void connectionException(Object connectionID, HornetQException me);
}
Modified: trunk/src/main/org/hornetq/core/remoting/spi/Connector.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/spi/Connector.java 2009-12-10 11:44:51 UTC (rev 8662)
+++ trunk/src/main/org/hornetq/core/remoting/spi/Connector.java 2009-12-10 11:53:59 UTC (rev 8663)
@@ -13,26 +13,35 @@
package org.hornetq.core.remoting.spi;
/**
- *
- * A Connector
- *
+ * A Connector is used by the client for creating and controlling a connection.
+ *
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
*/
public interface Connector
{
+ /**
+ * starts the connector
+ */
void start();
+ /**
+ * closes the connector
+ */
void close();
+ /**
+ * returns true if the connector is started, oterwise false.
+ *
+ * @return true if the connector is started
+ */
boolean isStarted();
/**
* Create and return a connection from this connector.
- *
+ * <p/>
* This method must NOT throw an exception if it fails to create the connection
* (e.g. network is not available), in this case it MUST return null
- *
+ *
* @return The connection, or null if unable to create a connection (e.g. network is unavailable)
*/
Connection createConnection();
Modified: trunk/src/main/org/hornetq/core/remoting/spi/ConnectorFactory.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/spi/ConnectorFactory.java 2009-12-10 11:44:51 UTC (rev 8662)
+++ trunk/src/main/org/hornetq/core/remoting/spi/ConnectorFactory.java 2009-12-10 11:53:59 UTC (rev 8663)
@@ -19,14 +19,25 @@
import java.util.concurrent.ScheduledExecutorService;
/**
- *
- * A ConnectorFactory
- *
+ * A ConnectorFactory is used by the client for creating connectors.
+ * <p/>
+ * A Connector is used to connect to an {@link org.hornetq.core.remoting.spi.Acceptor}.
+ *
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- *
*/
public interface ConnectorFactory
{
+ /**
+ * creates a new instanc of a connector.
+ *
+ * @param configuration the configuration
+ * @param handler the handler
+ * @param listener the listener
+ * @param closeExecutor the close executor
+ * @param threadPool the threadpool
+ * @param scheduledThreadPool the scheduled thread pool
+ * @return a new connector
+ */
Connector createConnector(Map<String, Object> configuration,
BufferHandler handler,
ConnectionLifeCycleListener listener,
@@ -34,5 +45,12 @@
Executor threadPool,
ScheduledExecutorService scheduledThreadPool);
+ /**
+ * Returns the allowable properties for this connector.
+ * <p/>
+ * This will differ between different connector implementations.
+ *
+ * @return the allowable properties.
+ */
Set<String> getAllowableProperties();
}
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-12-10 11:44:51 UTC (rev 8662)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-12-10 11:53:59 UTC (rev 8663)
@@ -16,11 +16,7 @@
import static org.hornetq.core.management.NotificationType.CONSUMER_CLOSED;
import static org.hornetq.core.management.NotificationType.CONSUMER_CREATED;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ScheduledExecutorService;
@@ -761,8 +757,27 @@
binding.addConsumer(filterString);
// Need to propagate the consumer add
- Notification notification = new Notification(null, CONSUMER_CREATED, message.getProperties());
+ TypedProperties props = new TypedProperties();
+ props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress());
+
+ props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME, clusterName);
+
+ props.putSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME, binding.getRoutingName());
+
+ props.putIntProperty(ManagementHelper.HDR_DISTANCE, distance + 1);
+
+ Queue theQueue = (Queue)binding.getBindable();
+
+ props.putIntProperty(ManagementHelper.HDR_CONSUMER_COUNT, theQueue.getConsumerCount());
+
+ if (filterString != null)
+ {
+ props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString);
+ }
+
+ Notification notification = new Notification(null, CONSUMER_CREATED, props);
+
managementService.sendNotification(notification);
}
@@ -796,8 +811,26 @@
binding.removeConsumer(filterString);
// Need to propagate the consumer close
- Notification notification = new Notification(null, CONSUMER_CLOSED, message.getProperties());
+ TypedProperties props = new TypedProperties();
+ props.putSimpleStringProperty(ManagementHelper.HDR_ADDRESS, binding.getAddress());
+
+ props.putSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME, clusterName);
+
+ props.putSimpleStringProperty(ManagementHelper.HDR_ROUTING_NAME, binding.getRoutingName());
+
+ props.putIntProperty(ManagementHelper.HDR_DISTANCE, distance + 1);
+
+ Queue theQueue = (Queue)binding.getBindable();
+
+ props.putIntProperty(ManagementHelper.HDR_CONSUMER_COUNT, theQueue.getConsumerCount());
+
+ if (filterString != null)
+ {
+ props.putSimpleStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString);
+ }
+ Notification notification = new Notification(null, CONSUMER_CLOSED, props);
+
managementService.sendNotification(notification);
}
Modified: trunk/src/main/org/hornetq/integration/transports/netty/NettyConnection.java
===================================================================
--- trunk/src/main/org/hornetq/integration/transports/netty/NettyConnection.java 2009-12-10 11:44:51 UTC (rev 8662)
+++ trunk/src/main/org/hornetq/integration/transports/netty/NettyConnection.java 2009-12-10 11:53:59 UTC (rev 8663)
@@ -145,11 +145,6 @@
return channel.getRemoteAddress().toString();
}
- public void fail(final HornetQException me)
- {
- listener.connectionException(channel.getId(), me);
- }
-
// Public --------------------------------------------------------
@Override
Modified: trunk/tests/src/org/hornetq/tests/integration/client/JournalCrashTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/JournalCrashTest.java 2009-12-10 11:44:51 UTC (rev 8662)
+++ trunk/tests/src/org/hornetq/tests/integration/client/JournalCrashTest.java 2009-12-10 11:53:59 UTC (rev 8663)
@@ -181,7 +181,7 @@
for (int i = start; i < end; i++)
{
ClientMessage msg = session.createMessage(true);
- msg.getProperties().putIntProperty(new SimpleString("key"), i);
+ msg.putIntProperty(new SimpleString("key"), i);
msg.getBodyBuffer().writeUTF("message " + i);
prod.send(msg);
}
15 years, 3 months
JBoss hornetq SVN: r8662 - in trunk/src/main/org/hornetq: core/client and 6 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-12-10 06:44:51 -0500 (Thu, 10 Dec 2009)
New Revision: 8662
Added:
trunk/src/main/org/hornetq/core/message/impl/MessageInternal.java
Modified:
trunk/src/main/org/hornetq/core/buffers/impl/ResetLimitWrappedHornetQBuffer.java
trunk/src/main/org/hornetq/core/client/ClientMessage.java
trunk/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java
trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
trunk/src/main/org/hornetq/core/message/Message.java
trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/MessagePacket.java
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java
trunk/src/main/org/hornetq/core/server/ServerMessage.java
trunk/src/main/org/hornetq/jms/client/HornetQMessage.java
Log:
refactored message interface to separate out internal methods to messgeinternal interface
Modified: trunk/src/main/org/hornetq/core/buffers/impl/ResetLimitWrappedHornetQBuffer.java
===================================================================
--- trunk/src/main/org/hornetq/core/buffers/impl/ResetLimitWrappedHornetQBuffer.java 2009-12-10 09:44:05 UTC (rev 8661)
+++ trunk/src/main/org/hornetq/core/buffers/impl/ResetLimitWrappedHornetQBuffer.java 2009-12-10 11:44:51 UTC (rev 8662)
@@ -17,7 +17,7 @@
import org.hornetq.core.buffers.HornetQBuffer;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.message.Message;
+import org.hornetq.core.message.impl.MessageInternal;
import org.hornetq.utils.SimpleString;
/**
@@ -32,9 +32,9 @@
private final int limit;
- private final Message message;
+ private final MessageInternal message;
- public ResetLimitWrappedHornetQBuffer(final int limit, final HornetQBuffer buffer, final Message message)
+ public ResetLimitWrappedHornetQBuffer(final int limit, final HornetQBuffer buffer, final MessageInternal message)
{
super(buffer.channelBuffer());
Modified: trunk/src/main/org/hornetq/core/client/ClientMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/ClientMessage.java 2009-12-10 09:44:05 UTC (rev 8661)
+++ trunk/src/main/org/hornetq/core/client/ClientMessage.java 2009-12-10 11:44:51 UTC (rev 8662)
@@ -64,9 +64,8 @@
/**
* Sets the OutputStream that will receive the content of a message received in a non blocking way.
*
- * This method is used for large message and is not meant to be called directly by HornetQ clients.
+ * This method is used when consuming large messages
*
- * @deprecated
* @throws HornetQException
*/
void setOutputStream(OutputStream out) throws HornetQException;
@@ -75,9 +74,8 @@
* Saves the content of the message to the OutputStream.
* It will block until the entire content is transfered to the OutputStream.
*
- * This method is used for large message and is not meant to be called directly by HornetQ clients.
+ * This method is used for when consuming large messages
*
- * @deprecated
* @throws HornetQException
*/
void saveToOutputStream(OutputStream out) throws HornetQException;
@@ -85,22 +83,20 @@
/**
* Wait the outputStream completion of the message.
*
- * This method is used for large message and is not meant to be called directly by HornetQ clients.
+ * This method is used when consuming large messages
*
* @param timeMilliseconds - 0 means wait forever
* @return true if it reached the end
* @throws HornetQException
-
- * @deprecated
+
*/
boolean waitOutputStreamCompletion(long timeMilliseconds) throws HornetQException;
/**
* Sets the body's IntputStream.
*
- * This method is used for large message and is not meant to be called directly by HornetQ clients.
+ * This method is used when sending large messages
*
- * @deprecated
* @throws HornetQException
*/
void setBodyInputStream(InputStream bodyInputStream);
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java 2009-12-10 09:44:05 UTC (rev 8661)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientMessageInternal.java 2009-12-10 11:44:51 UTC (rev 8662)
@@ -15,6 +15,7 @@
import org.hornetq.core.buffers.HornetQBuffer;
import org.hornetq.core.client.ClientMessage;
+import org.hornetq.core.message.impl.MessageInternal;
/**
* A ClientMessageInternal
@@ -25,7 +26,7 @@
*
*
*/
-public interface ClientMessageInternal extends ClientMessage
+public interface ClientMessageInternal extends ClientMessage, MessageInternal
{
/** Size used for FlowControl */
int getFlowControlSize();
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2009-12-10 09:44:05 UTC (rev 8661)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientProducerImpl.java 2009-12-10 11:44:51 UTC (rev 8662)
@@ -23,6 +23,7 @@
import org.hornetq.core.message.BodyEncoder;
import org.hornetq.core.message.Message;
import org.hornetq.core.message.impl.MessageImpl;
+import org.hornetq.core.message.impl.MessageInternal;
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.impl.wireformat.SessionSendContinuationMessage;
import org.hornetq.core.remoting.impl.wireformat.SessionSendLargeMessage;
@@ -201,18 +202,20 @@
private void doSend(final SimpleString address, final Message msg) throws HornetQException
{
+ MessageInternal msgI = (MessageInternal)msg;
+
ClientProducerCredits theCredits;
if (address != null)
{
- msg.setAddress(address);
+ msgI.setAddress(address);
// Anonymous
theCredits = session.getCredits(address);
}
else
{
- msg.setAddress(this.address);
+ msgI.setAddress(this.address);
theCredits = credits;
}
@@ -226,16 +229,16 @@
if (groupID != null)
{
- msg.putStringProperty(MessageImpl.HDR_GROUP_ID, groupID);
+ msgI.putStringProperty(MessageImpl.HDR_GROUP_ID, groupID);
}
- boolean sendBlocking = msg.isDurable() ? blockOnDurableSend : blockOnNonDurableSend;
+ boolean sendBlocking = msgI.isDurable() ? blockOnDurableSend : blockOnNonDurableSend;
session.workDone();
boolean isLarge;
- if (msg.getBodyInputStream() != null || msg.isLargeMessage())
+ if (msgI.getBodyInputStream() != null || msgI.isLargeMessage())
{
isLarge = true;
}
@@ -246,19 +249,19 @@
if (isLarge)
{
- largeMessageSend(sendBlocking, msg, theCredits);
+ largeMessageSend(sendBlocking, msgI, theCredits);
}
else
{
- SessionSendMessage message = new SessionSendMessage(msg, sendBlocking);
+ SessionSendMessage packet = new SessionSendMessage(msgI, sendBlocking);
if (sendBlocking)
{
- channel.sendBlocking(message);
+ channel.sendBlocking(packet);
}
else
{
- channel.send(message);
+ channel.send(packet);
}
}
@@ -272,7 +275,7 @@
if (!isLarge)
{
- theCredits.acquireCredits(msg.getEncodeSize());
+ theCredits.acquireCredits(msgI.getEncodeSize());
}
}
catch (InterruptedException e)
@@ -291,12 +294,12 @@
// Methods to send Large Messages----------------------------------------------------------------
/**
- * @param msg
+ * @param msgI
* @throws HornetQException
*/
- private void largeMessageSend(final boolean sendBlocking, final Message msg, final ClientProducerCredits credits) throws HornetQException
+ private void largeMessageSend(final boolean sendBlocking, final MessageInternal msgI, final ClientProducerCredits credits) throws HornetQException
{
- int headerSize = msg.getHeadersAndPropertiesEncodeSize();
+ int headerSize = msgI.getHeadersAndPropertiesEncodeSize();
if (headerSize >= minLargeMessageSize)
{
@@ -305,14 +308,14 @@
}
// msg.getBody() could be Null on LargeServerMessage
- if (msg.getBodyInputStream() == null && msg.getWholeBuffer() != null)
+ if (msgI.getBodyInputStream() == null && msgI.getWholeBuffer() != null)
{
- msg.getWholeBuffer().readerIndex(0);
+ msgI.getWholeBuffer().readerIndex(0);
}
HornetQBuffer headerBuffer = HornetQBuffers.fixedBuffer(headerSize);
- msg.encodeHeadersAndProperties(headerBuffer);
+ msgI.encodeHeadersAndProperties(headerBuffer);
SessionSendLargeMessage initialChunk = new SessionSendLargeMessage(headerBuffer.toByteBuffer().array());
@@ -320,13 +323,13 @@
try
{
- credits.acquireCredits(msg.getHeadersAndPropertiesEncodeSize());
+ credits.acquireCredits(msgI.getHeadersAndPropertiesEncodeSize());
}
catch (InterruptedException e)
{
}
- InputStream input = msg.getBodyInputStream();
+ InputStream input = msgI.getBodyInputStream();
if (input != null)
{
@@ -334,20 +337,20 @@
}
else
{
- largeMessageSendBuffered(sendBlocking, msg, credits);
+ largeMessageSendBuffered(sendBlocking, msgI, credits);
}
}
/**
* @param sendBlocking
- * @param msg
+ * @param msgI
* @throws HornetQException
*/
private void largeMessageSendBuffered(final boolean sendBlocking,
- final Message msg,
+ final MessageInternal msgI,
final ClientProducerCredits credits) throws HornetQException
{
- BodyEncoder context = msg.getBodyEncoder();
+ BodyEncoder context = msgI.getBodyEncoder();
final long bodySize = context.getLargeBodySize();
Modified: trunk/src/main/org/hornetq/core/message/Message.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/Message.java 2009-12-10 09:44:05 UTC (rev 8661)
+++ trunk/src/main/org/hornetq/core/message/Message.java 2009-12-10 11:44:51 UTC (rev 8662)
@@ -18,7 +18,6 @@
import java.util.Set;
import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.exception.HornetQException;
import org.hornetq.utils.SimpleString;
import org.hornetq.utils.TypedProperties;
@@ -53,7 +52,6 @@
* If conversion is not allowed (for example calling {@code getFloatProperty} on a property set a {@code boolean}),
* a PropertyConversionException will be thrown.
*
- * @author <a href="mailto:ovidiu@feodorov.com">Ovidiu Feodorov</a>
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @author <a href="mailto:clebert.suconic@jboss.com">ClebertSuconic</a>
* @version <tt>$Revision: 3341 $</tt>
@@ -77,8 +75,6 @@
/**
* Sets the address to send this message to.
*
- * This method must not be called directly by HornetQ clients.
- *
* @param address address to send the message to
*/
void setAddress(SimpleString address);
@@ -169,43 +165,6 @@
*/
HornetQBuffer getBodyBuffer();
- // Should the following methods really be on the public API?
-
- /**
- * This method must not be called directly by HornetQ clients.
- */
- void decodeFromBuffer(HornetQBuffer buffer);
-
- /**
- * This method must not be called directly by HornetQ clients.
- */
- int getEndOfMessagePosition();
-
- /**
- * This method must not be called directly by HornetQ clients.
- */
- int getEndOfBodyPosition();
-
- /**
- * This method must not be called directly by HornetQ clients.
- */
- void checkCopy();
-
- /**
- * This method must not be called directly by HornetQ clients.
- */
- void bodyChanged();
-
- /**
- * This method must not be called directly by HornetQ clients.
- */
- void resetCopied();
-
- /**
- * This method must not be called directly by HornetQ clients.
- */
- HornetQBuffer getEncodedBuffer();
-
// Properties
// ------------------------------------------------------------------
@@ -523,39 +482,8 @@
*/
Set<SimpleString> getPropertyNames();
- Map<String, Object> toMap();
-
- // FIXME - All this stuff is only necessary here for large messages - it should be refactored to be put in a better
- // place
-
/**
- * This method must not be called directly by HornetQ clients.
+ * @return Returns the message in Map form, useful when encoding to JSON
*/
- int getHeadersAndPropertiesEncodeSize();
-
- /**
- * This method must not be called directly by HornetQ clients.
- */
- HornetQBuffer getWholeBuffer();
-
- /**
- * This method must not be called directly by HornetQ clients.
- */
- void encodeHeadersAndProperties(HornetQBuffer buffer);
-
- /**
- * This method must not be called directly by HornetQ clients.
- */
- void decodeHeadersAndProperties(HornetQBuffer buffer);
-
- /**
- * This method must not be called directly by HornetQ clients.
- */
- BodyEncoder getBodyEncoder() throws HornetQException;
-
- /**
- * This method must not be called directly by HornetQ clients.
- */
- InputStream getBodyInputStream();
-
+ Map<String, Object> toMap();
}
Modified: trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-12-10 09:44:05 UTC (rev 8661)
+++ trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-12-10 11:44:51 UTC (rev 8662)
@@ -25,7 +25,6 @@
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.BodyEncoder;
-import org.hornetq.core.message.Message;
import org.hornetq.core.message.PropertyConversionException;
import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
import org.hornetq.utils.DataConstants;
@@ -46,7 +45,7 @@
*
* $Id: MessageSupport.java 2740 2007-05-30 11:36:28Z timfox $
*/
-public abstract class MessageImpl implements Message
+public abstract class MessageImpl implements MessageInternal
{
// Constants -----------------------------------------------------
Added: trunk/src/main/org/hornetq/core/message/impl/MessageInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/impl/MessageInternal.java (rev 0)
+++ trunk/src/main/org/hornetq/core/message/impl/MessageInternal.java 2009-12-10 11:44:51 UTC (rev 8662)
@@ -0,0 +1,59 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.message.impl;
+
+import java.io.InputStream;
+
+import org.hornetq.core.buffers.HornetQBuffer;
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.message.BodyEncoder;
+import org.hornetq.core.message.Message;
+
+/**
+ * A MessageInternal
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ * TODO - this can be refactored further to separate out large message specific stuff
+ *
+ *
+ */
+public interface MessageInternal extends Message
+{
+ void decodeFromBuffer(HornetQBuffer buffer);
+
+ int getEndOfMessagePosition();
+
+ int getEndOfBodyPosition();
+
+ void checkCopy();
+
+ void bodyChanged();
+
+ void resetCopied();
+
+ HornetQBuffer getEncodedBuffer();
+
+ int getHeadersAndPropertiesEncodeSize();
+
+ HornetQBuffer getWholeBuffer();
+
+ void encodeHeadersAndProperties(HornetQBuffer buffer);
+
+ void decodeHeadersAndProperties(HornetQBuffer buffer);
+
+ BodyEncoder getBodyEncoder() throws HornetQException;
+
+ InputStream getBodyInputStream();
+}
Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/MessagePacket.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/MessagePacket.java 2009-12-10 09:44:05 UTC (rev 8661)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/MessagePacket.java 2009-12-10 11:44:51 UTC (rev 8662)
@@ -15,6 +15,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.Message;
+import org.hornetq.core.message.impl.MessageInternal;
/**
* A MessagePacket
@@ -27,9 +28,9 @@
{
private static final Logger log = Logger.getLogger(MessagePacket.class);
- protected Message message;
+ protected MessageInternal message;
- public MessagePacket(final byte type, final Message message)
+ public MessagePacket(final byte type, final MessageInternal message)
{
super(type);
Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java 2009-12-10 09:44:05 UTC (rev 8661)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java 2009-12-10 11:44:51 UTC (rev 8662)
@@ -16,7 +16,7 @@
import org.hornetq.core.buffers.HornetQBuffer;
import org.hornetq.core.client.impl.ClientMessageImpl;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.message.Message;
+import org.hornetq.core.message.impl.MessageInternal;
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.utils.DataConstants;
@@ -36,7 +36,7 @@
private int deliveryCount;
- public SessionReceiveMessage(final long consumerID, final Message message, final int deliveryCount)
+ public SessionReceiveMessage(final long consumerID, final MessageInternal message, final int deliveryCount)
{
super(PacketImpl.SESS_RECEIVE_MSG, message);
Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java 2009-12-10 09:44:05 UTC (rev 8661)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java 2009-12-10 11:44:51 UTC (rev 8662)
@@ -16,6 +16,7 @@
import org.hornetq.core.buffers.HornetQBuffer;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.Message;
+import org.hornetq.core.message.impl.MessageInternal;
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.utils.DataConstants;
@@ -39,7 +40,7 @@
// Constructors --------------------------------------------------
- public SessionSendMessage(final Message message, final boolean requiresResponse)
+ public SessionSendMessage(final MessageInternal message, final boolean requiresResponse)
{
super(PacketImpl.SESS_SEND, message);
Modified: trunk/src/main/org/hornetq/core/server/ServerMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerMessage.java 2009-12-10 09:44:05 UTC (rev 8661)
+++ trunk/src/main/org/hornetq/core/server/ServerMessage.java 2009-12-10 11:44:51 UTC (rev 8662)
@@ -14,7 +14,7 @@
package org.hornetq.core.server;
import org.hornetq.core.journal.EncodingSupport;
-import org.hornetq.core.message.Message;
+import org.hornetq.core.message.impl.MessageInternal;
import org.hornetq.core.paging.PagingStore;
/**
@@ -25,7 +25,7 @@
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
*
*/
-public interface ServerMessage extends Message, EncodingSupport
+public interface ServerMessage extends MessageInternal, EncodingSupport
{
void setMessageID(long id);
Modified: trunk/src/main/org/hornetq/jms/client/HornetQMessage.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQMessage.java 2009-12-10 09:44:05 UTC (rev 8661)
+++ trunk/src/main/org/hornetq/jms/client/HornetQMessage.java 2009-12-10 11:44:51 UTC (rev 8662)
@@ -700,12 +700,8 @@
public Object getObjectProperty(final String name) throws JMSException
{
- if (HornetQMessage.JMS_HORNETQ_INPUT_STREAM.equals(name))
+ if (HornetQMessage.JMSXDELIVERYCOUNT.equals(name))
{
- return message.getBodyInputStream();
- }
- else if (HornetQMessage.JMSXDELIVERYCOUNT.equals(name))
- {
return String.valueOf(message.getDeliveryCount());
}
15 years, 3 months
JBoss hornetq SVN: r8661 - in trunk: src/main/org/hornetq/core/server/impl and 3 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-12-10 04:44:05 -0500 (Thu, 10 Dec 2009)
New Revision: 8661
Modified:
trunk/src/main/org/hornetq/core/server/Queue.java
trunk/src/main/org/hornetq/core/server/ServerSession.java
trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java
trunk/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
Log:
simpified receive immediate logic on the server
Modified: trunk/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/Queue.java 2009-12-10 09:19:17 UTC (rev 8660)
+++ trunk/src/main/org/hornetq/core/server/Queue.java 2009-12-10 09:44:05 UTC (rev 8661)
@@ -120,8 +120,6 @@
boolean hasMatchingConsumer(ServerMessage message);
- void deliverNow();
-
Collection<Consumer> getConsumers();
boolean checkDLQ(MessageReference ref) throws Exception;
Modified: trunk/src/main/org/hornetq/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerSession.java 2009-12-10 09:19:17 UTC (rev 8660)
+++ trunk/src/main/org/hornetq/core/server/ServerSession.java 2009-12-10 09:44:05 UTC (rev 8661)
@@ -68,7 +68,7 @@
void close() throws Exception;
- void promptDelivery(Queue queue, boolean async);
+ void promptDelivery(Queue queue);
void handleAcknowledge(final SessionAcknowledgeMessage packet);
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-12-10 09:19:17 UTC (rev 8660)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-12-10 09:44:05 UTC (rev 8661)
@@ -337,20 +337,19 @@
* When the consumer receives such a "forced delivery" message, it discards it
* and knows that there are no other messages to be delivered.
*/
-
- // TODO - why is this executed on a different thread?
public synchronized void forceDelivery(final long sequence)
{
+ promptDelivery();
+
executor.execute(new Runnable()
{
public void run()
{
try
{
- // The prompt delivery is called synchronously to ensure the "forced delivery" message is
- // sent after any queue delivery.
- promptDelivery(false);
-
+ // We execute this on the same executor to make sure the force delivery message is written after
+ // any delivery is completed
+
ServerMessage forcedDeliveryMessage = new ServerMessageImpl(storageManager.generateUniqueID(), 50);
forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
@@ -413,7 +412,7 @@
// Outside the lock
if (started)
{
- promptDelivery(true);
+ promptDelivery();
}
}
@@ -465,7 +464,7 @@
if (!transferring)
{
- promptDelivery(true);
+ promptDelivery();
}
}
@@ -491,7 +490,7 @@
if (previous <= 0 && previous + credits > 0)
{
- promptDelivery(true);
+ promptDelivery();
}
}
}
@@ -580,7 +579,7 @@
// Private --------------------------------------------------------------------------------------
- private void promptDelivery(final boolean asyncDelivery)
+ private void promptDelivery()
{
lock.lock();
try
@@ -595,18 +594,11 @@
{
if (browseOnly)
{
- if (asyncDelivery)
- {
- executor.execute(browserDeliverer);
- }
- else
- {
- browserDeliverer.run();
- }
+ executor.execute(browserDeliverer);
}
else
{
- session.promptDelivery(messageQueue, asyncDelivery);
+ session.promptDelivery(messageQueue);
}
}
}
@@ -668,7 +660,7 @@
else
{
// prompt Delivery only if chunk was finished
- session.promptDelivery(messageQueue, true);
+ session.promptDelivery(messageQueue);
}
}
}
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-12-10 09:19:17 UTC (rev 8660)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-12-10 09:44:05 UTC (rev 8661)
@@ -114,30 +114,7 @@
private static final Logger log = Logger.getLogger(ServerSessionImpl.class);
// Static -------------------------------------------------------------------------------
-
- // TODO not actually used currently
- // private static int offset;
- //
- // static
- // {
- // try
- // {
- // ServerMessage msg = new ServerMessageImpl(1, ChannelBuffers.EMPTY_BUFFER);
- //
- // msg.setDestination(new SimpleString("foobar"));
- //
- // int es = msg.getEncodeSize();
- //
- // int me = msg.getMemoryEstimate();
- //
- // offset = MessageReferenceImpl.getMemoryEstimate() + me - es;
- // }
- // catch (Exception e)
- // {
- // log.error("Failed to initialise mult and offset", e);
- // }
- // }
-
+
// Attributes ----------------------------------------------------------------------------
private final long id;
@@ -361,16 +338,9 @@
}
}
- public void promptDelivery(final Queue queue, final boolean async)
+ public void promptDelivery(final Queue queue)
{
- if (async)
- {
- queue.deliverAsync(executor);
- }
- else
- {
- queue.deliverNow();
- }
+ queue.deliverAsync(executor);
}
public void handleCreateConsumer(final SessionCreateConsumerMessage packet)
Modified: trunk/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java 2009-12-10 09:19:17 UTC (rev 8660)
+++ trunk/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java 2009-12-10 09:44:05 UTC (rev 8661)
@@ -21,6 +21,7 @@
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.QueueImpl;
import org.hornetq.tests.unit.core.server.impl.fakes.FakeConsumer;
import org.hornetq.tests.unit.core.server.impl.fakes.FakeQueueFactory;
import org.hornetq.tests.util.UnitTestCase;
@@ -60,7 +61,7 @@
*/
public void testConcurrentAddsDeliver() throws Exception
{
- Queue queue = queueFactory.createQueue(1,
+ QueueImpl queue = (QueueImpl)queueFactory.createQueue(1,
new SimpleString("address1"),
new SimpleString("queue1"),
null,
@@ -162,7 +163,7 @@
{
private volatile Exception e;
- private final Queue queue;
+ private final QueueImpl queue;
private final FakeConsumer consumer;
@@ -182,7 +183,7 @@
return e;
}
- Toggler(final Queue queue, final FakeConsumer consumer, final long testTime)
+ Toggler(final QueueImpl queue, final FakeConsumer consumer, final long testTime)
{
this.testTime = testTime;
Modified: trunk/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java 2009-12-10 09:19:17 UTC (rev 8660)
+++ trunk/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java 2009-12-10 09:44:05 UTC (rev 8661)
@@ -76,7 +76,7 @@
public void testScheduledNoConsumer() throws Exception
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
new SimpleString("address1"),
new SimpleString("queue1"),
null,
@@ -150,7 +150,7 @@
private void testScheduled(final boolean direct) throws Exception
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
new SimpleString("address1"),
new SimpleString("queue1"),
null,
@@ -265,7 +265,7 @@
return HandleStatus.HANDLED;
}
};
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
new SimpleString("address1"),
QueueImplTest.queue1,
null,
Modified: trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java 2009-12-10 09:19:17 UTC (rev 8660)
+++ trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java 2009-12-10 09:44:05 UTC (rev 8661)
@@ -69,7 +69,7 @@
{
final SimpleString name = new SimpleString("oobblle");
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
name,
null,
@@ -85,7 +85,7 @@
public void testDurable()
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -120,7 +120,7 @@
Consumer cons3 = new FakeConsumer();
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -170,7 +170,7 @@
public void testGetFilter()
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -213,7 +213,7 @@
public void testSimpleadd()
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -241,7 +241,7 @@
public void testSimpleDirectDelivery() throws Exception
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -278,7 +278,7 @@
public void testSimpleNonDirectDelivery() throws Exception
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -325,7 +325,7 @@
public void testBusyConsumer() throws Exception
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -378,7 +378,7 @@
public void testBusyConsumerThenAddMoreMessages() throws Exception
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -454,7 +454,7 @@
public void testAddFirstadd() throws Exception
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -517,7 +517,7 @@
public void testChangeConsumersAndDeliver() throws Exception
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -680,7 +680,7 @@
public void testConsumerReturningNull() throws Exception
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -722,7 +722,7 @@
public void testRoundRobinWithQueueing() throws Exception
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -774,7 +774,7 @@
public void testRoundRobinDirect() throws Exception
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -824,7 +824,7 @@
public void testWithPriorities() throws Exception
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -900,7 +900,7 @@
public void testConsumerWithFilterAddAndRemove()
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -918,7 +918,7 @@
public void testList()
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -951,7 +951,7 @@
public void testListWithFilter()
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -996,7 +996,7 @@
public void testConsumeWithFiltersAddAndRemoveConsumer() throws Exception
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -1074,7 +1074,7 @@
public void testBusyConsumerWithFilterFirstCallBusy() throws Exception
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -1124,7 +1124,7 @@
public void testBusyConsumerWithFilterThenAddMoreMessages() throws Exception
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -1207,7 +1207,7 @@
public void testConsumerWithFilterThenAddMoreMessages() throws Exception
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -1280,7 +1280,7 @@
private void testConsumerWithFilters(final boolean direct) throws Exception
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -1378,7 +1378,7 @@
public void testMessageOrder() throws Exception
{
FakeConsumer consumer = new FakeConsumer();
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -1407,7 +1407,7 @@
public void testMessagesAdded() throws Exception
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -1428,7 +1428,7 @@
public void testGetReference() throws Exception
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -1450,7 +1450,7 @@
public void testGetNonExistentReference() throws Exception
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -1476,7 +1476,7 @@
*/
public void testPauseAndResumeWithAsync() throws Exception
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -1540,7 +1540,7 @@
public void testPauseAndResumeWithDirect() throws Exception
{
- Queue queue = new QueueImpl(1,
+ QueueImpl queue = new QueueImpl(1,
QueueImplTest.address1,
QueueImplTest.queue1,
null,
@@ -1588,7 +1588,7 @@
class AddtoQueueRunner implements Runnable
{
- Queue queue;
+ QueueImpl queue;
MessageReference messageReference;
@@ -1599,7 +1599,7 @@
boolean first;
public AddtoQueueRunner(final boolean first,
- final Queue queue,
+ final QueueImpl queue,
final MessageReference messageReference,
final CountDownLatch countDownLatch)
{
15 years, 3 months
JBoss hornetq SVN: r8660 - in trunk: src/main/org/hornetq/core/server/impl and 4 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-12-10 04:19:17 -0500 (Thu, 10 Dec 2009)
New Revision: 8660
Modified:
trunk/src/main/org/hornetq/core/journal/impl/SyncSpeedTest.java
trunk/src/main/org/hornetq/core/server/impl/CreditsAvailableRunnable.java
trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerProducerCreditManager.java
trunk/tests/src/org/hornetq/tests/integration/InterceptorTest.java
trunk/tests/src/org/hornetq/tests/integration/client/InVMPersistentMessageBufferTest.java
trunk/tests/src/org/hornetq/tests/integration/client/NettyNonPersistentMessageBufferTest.java
trunk/tests/src/org/hornetq/tests/integration/client/NettyPersistentMessageBufferTest.java
trunk/tests/src/org/hornetq/tests/integration/client/NettyProducerFlowControlTest.java
trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java
trunk/tests/src/org/hornetq/tests/integration/paging/NettyPagingSendTest.java
trunk/tests/src/org/hornetq/tests/opt/SendTest.java
Log:
Corrected author header
Modified: trunk/src/main/org/hornetq/core/journal/impl/SyncSpeedTest.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/SyncSpeedTest.java 2009-12-10 09:09:49 UTC (rev 8659)
+++ trunk/src/main/org/hornetq/core/journal/impl/SyncSpeedTest.java 2009-12-10 09:19:17 UTC (rev 8660)
@@ -32,7 +32,7 @@
* This class just provides some diagnostics on how fast your disk can sync
* Useful when determining performance issues
*
- * @author tim fox
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a> fox
*
*
*/
Modified: trunk/src/main/org/hornetq/core/server/impl/CreditsAvailableRunnable.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/CreditsAvailableRunnable.java 2009-12-10 09:09:49 UTC (rev 8659)
+++ trunk/src/main/org/hornetq/core/server/impl/CreditsAvailableRunnable.java 2009-12-10 09:19:17 UTC (rev 8660)
@@ -16,7 +16,7 @@
/**
* A CreditsAvailableRunnable
*
- * @author tim
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
*
*
*/
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-12-10 09:09:49 UTC (rev 8659)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-12-10 09:19:17 UTC (rev 8660)
@@ -340,15 +340,15 @@
// TODO - why is this executed on a different thread?
public synchronized void forceDelivery(final long sequence)
- {
- // The prompt delivery is called synchronously to ensure the "forced delivery" message is
- // sent after any queue delivery.
+ {
executor.execute(new Runnable()
{
public void run()
{
try
{
+ // The prompt delivery is called synchronously to ensure the "forced delivery" message is
+ // sent after any queue delivery.
promptDelivery(false);
ServerMessage forcedDeliveryMessage = new ServerMessageImpl(storageManager.generateUniqueID(), 50);
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerProducerCreditManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerProducerCreditManager.java 2009-12-10 09:09:49 UTC (rev 8659)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerProducerCreditManager.java 2009-12-10 09:19:17 UTC (rev 8660)
@@ -18,7 +18,7 @@
/**
* A ServerProducerCreditManager
*
- * @author tim
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
*
*
*/
Modified: trunk/tests/src/org/hornetq/tests/integration/InterceptorTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/InterceptorTest.java 2009-12-10 09:09:49 UTC (rev 8659)
+++ trunk/tests/src/org/hornetq/tests/integration/InterceptorTest.java 2009-12-10 09:19:17 UTC (rev 8660)
@@ -37,7 +37,7 @@
*
* A InterceptorTest
*
- * @author tim fox
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a> fox
*
*
*/
Modified: trunk/tests/src/org/hornetq/tests/integration/client/InVMPersistentMessageBufferTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/InVMPersistentMessageBufferTest.java 2009-12-10 09:09:49 UTC (rev 8659)
+++ trunk/tests/src/org/hornetq/tests/integration/client/InVMPersistentMessageBufferTest.java 2009-12-10 09:19:17 UTC (rev 8660)
@@ -16,7 +16,7 @@
/**
* A InVMPersistentMessageBufferTest
*
- * @author tim
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
*
*
*/
Modified: trunk/tests/src/org/hornetq/tests/integration/client/NettyNonPersistentMessageBufferTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/NettyNonPersistentMessageBufferTest.java 2009-12-10 09:09:49 UTC (rev 8659)
+++ trunk/tests/src/org/hornetq/tests/integration/client/NettyNonPersistentMessageBufferTest.java 2009-12-10 09:19:17 UTC (rev 8660)
@@ -16,7 +16,7 @@
/**
* A NettyNonPersistentMessageBufferTest
*
- * @author tim
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
*
*
*/
Modified: trunk/tests/src/org/hornetq/tests/integration/client/NettyPersistentMessageBufferTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/NettyPersistentMessageBufferTest.java 2009-12-10 09:09:49 UTC (rev 8659)
+++ trunk/tests/src/org/hornetq/tests/integration/client/NettyPersistentMessageBufferTest.java 2009-12-10 09:19:17 UTC (rev 8660)
@@ -16,7 +16,7 @@
/**
* A NettyPersistentMessageBufferTest
*
- * @author tim
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
*
*
*/
Modified: trunk/tests/src/org/hornetq/tests/integration/client/NettyProducerFlowControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/NettyProducerFlowControlTest.java 2009-12-10 09:09:49 UTC (rev 8659)
+++ trunk/tests/src/org/hornetq/tests/integration/client/NettyProducerFlowControlTest.java 2009-12-10 09:19:17 UTC (rev 8660)
@@ -16,7 +16,7 @@
/**
* A NettyProducerFlowControlTest
*
- * @author tim
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
*
*
*/
Modified: trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java 2009-12-10 09:09:49 UTC (rev 8659)
+++ trunk/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java 2009-12-10 09:19:17 UTC (rev 8660)
@@ -39,7 +39,7 @@
*
* A ProducerFlowControlTest
*
- * @author tim fox
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a> fox
*
*/
public class ProducerFlowControlTest extends ServiceTestBase
Modified: trunk/tests/src/org/hornetq/tests/integration/paging/NettyPagingSendTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/paging/NettyPagingSendTest.java 2009-12-10 09:09:49 UTC (rev 8659)
+++ trunk/tests/src/org/hornetq/tests/integration/paging/NettyPagingSendTest.java 2009-12-10 09:19:17 UTC (rev 8660)
@@ -16,7 +16,7 @@
/**
* A NettyPagingSendTest
*
- * @author tim
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
*
*
*/
Modified: trunk/tests/src/org/hornetq/tests/opt/SendTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/opt/SendTest.java 2009-12-10 09:09:49 UTC (rev 8659)
+++ trunk/tests/src/org/hornetq/tests/opt/SendTest.java 2009-12-10 09:19:17 UTC (rev 8660)
@@ -48,7 +48,7 @@
/**
* A SendTest
*
- * @author tim
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
*
*
*/
15 years, 3 months
JBoss hornetq SVN: r8659 - trunk/tests/src/org/hornetq/tests/integration/cluster/reattach.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-12-10 04:09:49 -0500 (Thu, 10 Dec 2009)
New Revision: 8659
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java
Log:
number of its to 2 since we now have stress tests for this
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java 2009-12-10 09:04:09 UTC (rev 8658)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java 2009-12-10 09:09:49 UTC (rev 8659)
@@ -1173,7 +1173,7 @@
protected int getNumIterations()
{
- return 5;
+ return 2;
}
@Override
15 years, 3 months
JBoss hornetq SVN: r8658 - in trunk: src/main/org/hornetq/core/journal/impl and 2 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-12-10 04:04:09 -0500 (Thu, 10 Dec 2009)
New Revision: 8658
Modified:
trunk/examples/core/perf/server0/hornetq-configuration.xml
trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/RandomReattachTest.java
Log:
simplified timed buffer fix + fix extra hang in receiveimmediate code
Modified: trunk/examples/core/perf/server0/hornetq-configuration.xml
===================================================================
--- trunk/examples/core/perf/server0/hornetq-configuration.xml 2009-12-09 21:45:29 UTC (rev 8657)
+++ trunk/examples/core/perf/server0/hornetq-configuration.xml 2009-12-10 09:04:09 UTC (rev 8658)
@@ -17,7 +17,7 @@
<persistence-enabled>true</persistence-enabled>
- <journal-sync-non-transactional>false</journal-sync-non-transactional>
+ <journal-sync-non-transactional>true</journal-sync-non-transactional>
<journal-sync-transactional>true</journal-sync-transactional>
<journal-type>ASYNCIO</journal-type>
<journal-min-files>20</journal-min-files>
Modified: trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-12-09 21:45:29 UTC (rev 8657)
+++ trunk/src/main/org/hornetq/core/journal/impl/TimedBuffer.java 2009-12-10 09:04:09 UTC (rev 8658)
@@ -20,7 +20,7 @@
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.hornetq.core.buffers.HornetQBuffer;
@@ -47,8 +47,13 @@
private TimedBufferObserver bufferObserver;
- private CheckTimer timer;
+ // If the TimedBuffer is idle - i.e. no records are being added, then it's pointless the timer flush thread
+ // in spinning and checking the time - and using up CPU in the process - this semaphore is used to
+ // prevent that
+ private final Semaphore spinLimiter = new Semaphore(1);
+ private CheckTimer timerRunnable = new CheckTimer();
+
private final int bufferSize;
private final HornetQBuffer buffer;
@@ -85,6 +90,8 @@
private final AtomicLong lastFlushTime = new AtomicLong(0);
+ private boolean spinning = false;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -121,9 +128,9 @@
return;
}
- timer = new CheckTimer();
+ timerRunnable = new CheckTimer();
- timerThread = new Thread(timer, "hornetq-buffer-timeout");
+ timerThread = new Thread(timerRunnable, "hornetq-buffer-timeout");
timerThread.start();
@@ -134,6 +141,15 @@
logRatesTimer.scheduleAtFixedRate(logRatesTimerTask, 2000, 2000);
}
+ // Need to start with the spin limiter acquired
+ try
+ {
+ spinLimiter.acquire();
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+
started = true;
}
@@ -148,8 +164,10 @@
bufferObserver = null;
- timer.stop();
+ spinLimiter.release();
+ timerRunnable.close();
+
if (logRates)
{
logRatesTimerTask.cancel();
@@ -232,7 +250,6 @@
public synchronized void addBytes(final EncodingSupport bytes, final boolean sync, final IOAsyncTask callback)
{
-
delayFlush = false;
bytes.encode(buffer);
@@ -252,69 +269,78 @@
// flush();
// }
- timer.resumeSpin();
+ if (!spinning)
+ {
+ spinLimiter.release();
+
+ spinning = true;
+ }
}
}
-
- /**
- * This method will verify if it a flush is required.
- * It is called directly by the CheckTimer.
- *
- * @return true means you can pause spinning for a while
- * */
- private synchronized boolean checkFlush()
+ public void flush()
{
- // delayFlush and pendingSync are changed inside synchronized blocks
- // They need to be done atomically
- if (!delayFlush && pendingSync && bufferObserver != null)
- {
- flush();
- return true;
- }
- else return !delayFlush;
+ flush(false);
}
-
+
/**
- * Note: Flush could be called by either the checkFlush (and timer), or by the Journal directly before moving to a new file
+ * force means the Journal is moving to a new file. Any pending write need to be done immediately
+ * or data could be lost
* */
- public synchronized void flush()
+ public void flush(final boolean force)
{
- if (buffer.writerIndex() > 0)
+ synchronized (this)
{
- int pos = buffer.writerIndex();
-
- if (logRates)
+ if ((force || !delayFlush) && buffer.writerIndex() > 0)
{
- bytesFlushed.addAndGet(pos);
- }
+ int pos = buffer.writerIndex();
- ByteBuffer bufferToFlush = bufferObserver.newBuffer(bufferSize, pos);
+ if (logRates)
+ {
+ bytesFlushed.addAndGet(pos);
+ }
- // Putting a byteArray on a native buffer is much faster, since it will do in a single native call.
- // Using bufferToFlush.put(buffer) would make several append calls for each byte
+ ByteBuffer bufferToFlush = bufferObserver.newBuffer(bufferSize, pos);
- bufferToFlush.put(buffer.toByteBuffer().array(), 0, pos);
+ // Putting a byteArray on a native buffer is much faster, since it will do in a single native call.
+ // Using bufferToFlush.put(buffer) would make several append calls for each byte
- if (bufferToFlush != null)
- {
- bufferObserver.flushBuffer(bufferToFlush, pendingSync, callbacks);
- }
+ bufferToFlush.put(buffer.toByteBuffer().array(), 0, pos);
- lastFlushTime.set(System.nanoTime());
+ if (bufferToFlush != null)
+ {
+ bufferObserver.flushBuffer(bufferToFlush, pendingSync, callbacks);
+ }
- pendingSync = false;
+ if (spinning)
+ {
+ try
+ {
+ // We acquire the spinLimiter semaphore - this prevents the timer flush thread unnecessarily spinning
+ // when the buffer is inactive
+ spinLimiter.acquire();
+ }
+ catch (InterruptedException e)
+ {
+ // Ignore
+ }
- callbacks = new LinkedList<IOAsyncTask>();
+ spinning = false;
+ }
- buffer.clear();
+ lastFlushTime.set(System.nanoTime());
- bufferLimit = 0;
+ pendingSync = false;
- flushesDone.incrementAndGet();
+ callbacks = new LinkedList<IOAsyncTask>();
- timer.pauseSpin();
+ buffer.clear();
+
+ bufferLimit = 0;
+
+ flushesDone.incrementAndGet();
+ }
}
}
@@ -376,69 +402,19 @@
private class CheckTimer implements Runnable
{
- private volatile boolean stopped = false;
+ private volatile boolean closed = false;
- private boolean spinning = false;
-
- // If the TimedBuffer is idle - i.e. no records are being added, then it's pointless the timer flush thread
- // in spinning and checking the time - and using up CPU in the process - this semaphore is used to
- // prevent that
- private final Semaphore spinLimiter = new Semaphore(1);
-
- public CheckTimer()
- {
- if (!spinLimiter.tryAcquire())
- {
- // JDK would be screwed up if this was happening
- throw new IllegalStateException("InternalError: Semaphore not working properly!");
- }
- spinning = false;
- }
-
- // Needs to be called within synchronized blocks on TimedBuffer
- public void resumeSpin()
- {
- spinning = true;
- spinLimiter.release();
- }
-
- // Needs to be called within synchronized blocks on TimedBuffer
- public void pauseSpin()
- {
- if (spinning)
- {
- spinning = false;
- try
- {
- if (!spinLimiter.tryAcquire(60, TimeUnit.SECONDS))
- {
- throw new IllegalStateException("Internal error on TimedBuffer. Can't stop spinning");
- }
- }
- catch (InterruptedException ignored)
- {
- }
- }
- }
-
public void run()
{
- while (!stopped)
+ while (!closed)
{
// We flush on the timer if there are pending syncs there and we've waited waited at least one
// timeout since the time of the last flush
// Effectively flushing "resets" the timer
- if (System.nanoTime() > lastFlushTime.get() + timeout)
+ if (pendingSync && bufferObserver != null && System.nanoTime() > lastFlushTime.get() + timeout)
{
- if (checkFlush())
- {
- if (!stopped)
- {
- // can't pause spin if stopped, or we would hang the thread
- pauseSpin();
- }
- }
+ flush();
}
try
@@ -455,10 +431,9 @@
}
}
- public void stop()
+ public void close()
{
- stopped = true;
- resumeSpin();
+ closed = true;
}
}
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-12-09 21:45:29 UTC (rev 8657)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-12-10 09:04:09 UTC (rev 8658)
@@ -48,6 +48,7 @@
import org.hornetq.core.server.ServerSession;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.impl.TransactionImpl;
+import org.hornetq.utils.Future;
import org.hornetq.utils.TypedProperties;
/**
@@ -120,10 +121,9 @@
private final ManagementService managementService;
private final Binding binding;
-
+
private boolean transferring = false;
-
// Constructors ---------------------------------------------------------------------------------
public ServerConsumerImpl(final long id,
@@ -201,7 +201,7 @@
// should go back into the
// queue for delivery later.
if (!started || transferring)
- {
+ {
return HandleStatus.BUSY;
}
@@ -416,25 +416,59 @@
promptDelivery(true);
}
}
-
+
public void setTransferring(final boolean transferring)
{
lock.lock();
try
{
this.transferring = transferring;
+
+ if (transferring)
+ {
+ // Now we must wait for any large message delivery to finish
+ while (largeMessageInDelivery)
+ {
+ try
+ {
+ Thread.sleep(1);
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+ }
+ }
}
finally
{
lock.unlock();
}
-
+
+ //Outside the lock
+ if (transferring)
+ {
+ // And we must wait for any force delivery to be executed - this is executed async so we add a future to the
+ // executor and
+ // wait for it to complete
+
+ Future future = new Future();
+
+ executor.execute(future);
+
+ boolean ok = future.await(10000);
+
+ if (!ok)
+ {
+ log.warn("Timed out waiting for executor to complete");
+ }
+ }
+
if (!transferring)
{
promptDelivery(true);
}
}
-
+
public void receiveCredits(final int credits) throws Exception
{
if (credits == -1)
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/RandomReattachTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/RandomReattachTest.java 2009-12-09 21:45:29 UTC (rev 8657)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/RandomReattachTest.java 2009-12-10 09:04:09 UTC (rev 8658)
@@ -742,7 +742,7 @@
for (MyHandler handler : handlers)
{
- boolean ok = handler.latch.await(10000, TimeUnit.MILLISECONDS);
+ boolean ok = handler.latch.await(20000, TimeUnit.MILLISECONDS);
Assert.assertTrue(ok);
}
15 years, 3 months