[hornetq-commits] JBoss hornetq SVN: r10197 - branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/impl.
do-not-reply at jboss.org
do-not-reply at jboss.org
Thu Feb 10 14:30:34 EST 2011
Author: clebert.suconic at jboss.com
Date: 2011-02-10 14:30:34 -0500 (Thu, 10 Feb 2011)
New Revision: 10197
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
Log:
JBPAPP-5895 - Caching object creating for after the server was activated
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2011-02-10 10:25:47 UTC (rev 10196)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2011-02-10 19:30:34 UTC (rev 10197)
@@ -111,6 +111,9 @@
private final Map<String, List<String>> connectionFactoryJNDI = new HashMap<String, List<String>>();
+ // We keep things cached if objects are created while the JMS is not active
+ private final List<Runnable> cachedCommands = new ArrayList<Runnable>();
+
private final HornetQServer server;
private JMSManagementService jmsManagementService;
@@ -211,6 +214,14 @@
{
deploy();
}
+
+ for (Runnable run: cachedCommands)
+ {
+ log.info("Running cached command for " + run);
+ run.run();
+ }
+
+ cachedCommands.clear();
}
catch (Exception e)
@@ -230,7 +241,8 @@
if (registry == null)
{
- if (!contextSet) context = new InitialContext();
+ if (!contextSet)
+ context = new InitialContext();
registry = new JndiBindingRegistry(context);
}
@@ -355,7 +367,7 @@
public synchronized void setContext(final Context context)
{
this.context = context;
-
+
if (registry != null && registry instanceof JndiBindingRegistry)
{
registry.setContext(context);
@@ -377,85 +389,106 @@
final boolean durable,
final String... jndi) throws Exception
{
- checkInitialised();
- boolean added = internalCreateQueue(queueName, selectorString, durable);
-
- if (!added)
+ if (active && queues.get(queueName) != null)
{
return false;
}
- HornetQDestination destination = queues.get(queueName);
- if (destination == null)
+ runAfterActive(new RunnableException()
{
- // sanity check. internalCreateQueue should already have done this check
- throw new IllegalArgumentException("Queue does not exist");
- }
-
- ArrayList<String> bindings = new ArrayList<String>();
-
- for (String jndiItem : jndi)
- {
- if (bindToJndi(jndiItem, destination))
+ public String toString()
{
- bindings.add(jndiItem);
+ return "createQueue for " + queueName;
}
- }
+
+ public void runException() throws Exception
+ {
+ if (internalCreateQueue(queueName, selectorString, durable))
+ {
- String[] usedJNDI = bindings.toArray(new String[bindings.size()]);
- addToBindings(queueJNDI, queueName, usedJNDI);
+ HornetQDestination destination = queues.get(queueName);
+ if (destination == null)
+ {
+ // sanity check. internalCreateQueue should already have done this check
+ throw new IllegalArgumentException("Queue does not exist");
+ }
- if (storeConfig && durable)
- {
- storage.storeDestination(new PersistedDestination(PersistedType.Queue, queueName, selectorString, durable));
- storage.addJNDI(PersistedType.Queue, queueName, usedJNDI);
- }
+ ArrayList<String> bindings = new ArrayList<String>();
- return added;
+ for (String jndiItem : jndi)
+ {
+ if (bindToJndi(jndiItem, destination))
+ {
+ bindings.add(jndiItem);
+ }
+ }
+
+ String[] usedJNDI = bindings.toArray(new String[bindings.size()]);
+ addToBindings(queueJNDI, queueName, usedJNDI);
+
+ if (storeConfig && durable)
+ {
+ storage.storeDestination(new PersistedDestination(PersistedType.Queue, queueName, selectorString, durable));
+ storage.addJNDI(PersistedType.Queue, queueName, usedJNDI);
+ }
+ }
+ }
+ });
+
+ return true;
}
public synchronized boolean createTopic(final boolean storeConfig, final String topicName, final String... jndi) throws Exception
{
- checkInitialised();
-
- boolean added = internalCreateTopic(topicName);
-
- if (!added)
+ if (active && topics.get(topicName) != null)
{
return false;
}
- else
+
+ runAfterActive(new RunnableException()
{
- HornetQDestination destination = topics.get(topicName);
-
- if (destination == null)
+ public String toString()
{
- // sanity check. internalCreateQueue should already have done this check
- throw new IllegalArgumentException("Queue does not exist");
+ return "createTopic for " + topicName;
}
+
+ public void runException() throws Exception
+ {
+ if (internalCreateTopic(topicName))
+ {
+ HornetQDestination destination = topics.get(topicName);
- ArrayList<String> bindings = new ArrayList<String>();
+ if (destination == null)
+ {
+ // sanity check. internalCreateQueue should already have done this check
+ throw new IllegalArgumentException("Queue does not exist");
+ }
- for (String jndiItem : jndi)
- {
- if (bindToJndi(jndiItem, destination))
- {
- bindings.add(jndiItem);
+ ArrayList<String> bindings = new ArrayList<String>();
+
+ for (String jndiItem : jndi)
+ {
+ if (bindToJndi(jndiItem, destination))
+ {
+ bindings.add(jndiItem);
+ }
+ }
+
+ String[] usedJNDI = bindings.toArray(new String[bindings.size()]);
+ addToBindings(topicJNDI, topicName, usedJNDI);
+
+ if (storeConfig)
+ {
+ storage.storeDestination(new PersistedDestination(PersistedType.Topic, topicName));
+ storage.addJNDI(PersistedType.Topic, topicName, usedJNDI);
+ }
}
}
+ });
- String[] usedJNDI = bindings.toArray(new String[bindings.size()]);
- addToBindings(topicJNDI, topicName, usedJNDI);
+ return true;
- if (storeConfig)
- {
- storage.storeDestination(new PersistedDestination(PersistedType.Topic, topicName));
- storage.addJNDI(PersistedType.Topic, topicName, usedJNDI);
- }
-
- return true;
- }
}
public boolean addTopicToJndi(final String topicName, final String jndiBinding) throws Exception
@@ -529,7 +562,8 @@
}
if (registry.lookup(jndiBinding) != null)
{
- throw new HornetQException(HornetQException.ADDRESS_EXISTS, "JNDI " + name + " is already being used by another connection factory");
+ throw new HornetQException(HornetQException.ADDRESS_EXISTS, "JNDI " + name +
+ " is already being used by another connection factory");
}
boolean added = bindToJndi(jndiBinding, factory);
if (added)
@@ -710,9 +744,7 @@
HornetQConnectionFactory cf = connectionFactories.get(name);
if (cf == null)
{
- ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl(name,
- ha,
- connectorNames);
+ ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl(name, ha, connectorNames);
configuration.setFactoryType(cfType);
createConnectionFactory(true, configuration, jndiBindings);
}
@@ -720,7 +752,7 @@
public synchronized void createConnectionFactory(final String name,
final boolean ha,
- JMSFactoryType cfType,
+ JMSFactoryType cfType,
final List<String> connectorNames,
final String clientID,
final long clientFailureCheckPeriod,
@@ -757,9 +789,7 @@
HornetQConnectionFactory cf = connectionFactories.get(name);
if (cf == null)
{
- ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl(name,
- ha,
- connectorNames);
+ ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl(name, ha, connectorNames);
configuration.setClientID(clientID);
configuration.setClientFailureCheckPeriod(clientFailureCheckPeriod);
configuration.setConnectionTTL(connectionTTL);
@@ -790,7 +820,7 @@
configuration.setReconnectAttempts(reconnectAttempts);
configuration.setFailoverOnInitialConnection(failoverOnInitialConnection);
configuration.setGroupID(groupId);
- createConnectionFactory(true, configuration, jndiBindings);
+ createConnectionFactory(true, configuration, jndiBindings);
}
}
@@ -833,9 +863,7 @@
HornetQConnectionFactory cf = connectionFactories.get(name);
if (cf == null)
{
- ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl(name,
- ha,
- jndiBindings);
+ ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl(name, ha, jndiBindings);
configuration.setDiscoveryGroupName(discoveryGroupName);
configuration.setFactoryType(cfType);
configuration.setClientID(clientID);
@@ -880,9 +908,7 @@
HornetQConnectionFactory cf = connectionFactories.get(name);
if (cf == null)
{
- ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl(name,
- ha,
- jndiBindings);
+ ConnectionFactoryConfiguration configuration = new ConnectionFactoryConfigurationImpl(name, ha, jndiBindings);
configuration.setDiscoveryGroupName(discoveryGroupName);
createConnectionFactory(true, configuration, jndiBindings);
}
@@ -890,35 +916,50 @@
public synchronized void createConnectionFactory(final boolean storeConfig,
final ConnectionFactoryConfiguration cfConfig,
- String... jndi) throws Exception
+ final String... jndi) throws Exception
{
- HornetQConnectionFactory cf = internalCreateCF(cfConfig);
-
- ArrayList<String> bindings = new ArrayList<String>();
-
- for (String jndiItem : jndi)
+ runAfterActive(new RunnableException()
{
- if (bindToJndi(jndiItem, cf))
+
+ public String toString()
{
- bindings.add(jndiItem);
+ return "createConnectionFactory for " + cfConfig.getName();
}
- }
+
- String[] usedJNDI = bindings.toArray(new String[bindings.size()]);
- addToBindings(connectionFactoryJNDI, cfConfig.getName(), usedJNDI);
+ public void runException() throws Exception
+ {
- if (storeConfig)
- {
- storage.storeConnectionFactory(new PersistedConnectionFactory(cfConfig));
- storage.addJNDI(PersistedType.ConnectionFactory, cfConfig.getName(), usedJNDI);
- }
+ HornetQConnectionFactory cf = internalCreateCF(cfConfig);
+
+ ArrayList<String> bindings = new ArrayList<String>();
+
+ for (String jndiItem : jndi)
+ {
+ if (bindToJndi(jndiItem, cf))
+ {
+ bindings.add(jndiItem);
+ }
+ }
+
+ String[] usedJNDI = bindings.toArray(new String[bindings.size()]);
+ addToBindings(connectionFactoryJNDI, cfConfig.getName(), usedJNDI);
+
+ if (storeConfig)
+ {
+ storage.storeConnectionFactory(new PersistedConnectionFactory(cfConfig));
+ storage.addJNDI(PersistedType.ConnectionFactory, cfConfig.getName(), usedJNDI);
+ }
+
+ }
+ });
}
-
+
public JMSStorageManager getJMSStorageManager()
{
return storage;
}
-
+
// used on tests only
public void replaceStorageManager(JMSStorageManager newStorage)
{
@@ -1023,11 +1064,15 @@
{
if (cfConfig.getDiscoveryGroupName() != null)
{
- DiscoveryGroupConfiguration groupConfig = server.getConfiguration().getDiscoveryGroupConfigurations().get(cfConfig.getDiscoveryGroupName());
-
+ DiscoveryGroupConfiguration groupConfig = server.getConfiguration()
+ .getDiscoveryGroupConfigurations()
+ .get(cfConfig.getDiscoveryGroupName());
+
if (groupConfig == null)
{
- throw new HornetQException(HornetQException.ILLEGAL_STATE, "Discovery Group '" + cfConfig.getDiscoveryGroupName() + "' doesn't exist on maing config");
+ throw new HornetQException(HornetQException.ILLEGAL_STATE,
+ "Discovery Group '" + cfConfig.getDiscoveryGroupName() +
+ "' doesn't exist on maing config");
}
if (cfConfig.isHA())
@@ -1043,22 +1088,24 @@
{
if (cfConfig.getConnectorNames() == null || cfConfig.getConnectorNames().size() == 0)
{
- throw new HornetQException(HornetQException.ILLEGAL_STATE, "Null Connector name passed to create ConnectionFactory");
+ throw new HornetQException(HornetQException.ILLEGAL_STATE,
+ "Null Connector name passed to create ConnectionFactory");
}
-
+
TransportConfiguration[] configs = new TransportConfiguration[cfConfig.getConnectorNames().size()];
-
+
int count = 0;
for (String name : cfConfig.getConnectorNames())
{
- TransportConfiguration connector = server.getConfiguration().getConnectorConfigurations().get(name);
+ TransportConfiguration connector = server.getConfiguration().getConnectorConfigurations().get(name);
if (connector == null)
{
- throw new HornetQException(HornetQException.ILLEGAL_STATE, "Connector '" + name + "' not found on the main configuration file");
+ throw new HornetQException(HornetQException.ILLEGAL_STATE, "Connector '" + name +
+ "' not found on the main configuration file");
}
configs[count++] = connector;
}
-
+
if (cfConfig.isHA())
{
cf = HornetQJMSClient.createConnectionFactoryWithHA(cfConfig.getFactoryType(), configs);
@@ -1099,7 +1146,7 @@
cf.setCompressLargeMessage(cfConfig.isCompressLargeMessages());
cf.setGroupID(cfConfig.getGroupID());
}
-
+
connectionFactories.put(cfConfig.getName(), cf);
jmsManagementService.registerConnectionFactory(cfConfig.getName(), cf);
@@ -1131,7 +1178,7 @@
{
return false;
}
-
+
if (registry != null)
{
for (String jndiBinding : jndiBindings)
@@ -1144,7 +1191,7 @@
connectionFactories.remove(name);
jmsManagementService.unregisterConnectionFactory(name);
-
+
return true;
}
@@ -1176,17 +1223,16 @@
checkInitialised();
return server.getHornetQServerControl().listSessions(connectionID);
}
-
-
+
public String listPreparedTransactionDetailsAsJSON() throws Exception
{
ResourceManager resourceManager = server.getResourceManager();
Map<Xid, Long> xids = resourceManager.getPreparedTransactionsWithCreationTime();
- if(xids == null || xids.size()==0)
+ if (xids == null || xids.size() == 0)
{
return "";
}
-
+
ArrayList<Entry<Xid, Long>> xidsSortedByCreationTime = new ArrayList<Map.Entry<Xid, Long>>(xids.entrySet());
Collections.sort(xidsSortedByCreationTime, new Comparator<Entry<Xid, Long>>()
{
@@ -1196,24 +1242,22 @@
return (int)(entry1.getValue() - entry2.getValue());
}
});
-
+
JSONArray txDetailListJson = new JSONArray();
for (Map.Entry<Xid, Long> entry : xidsSortedByCreationTime)
{
Xid xid = entry.getKey();
- TransactionDetail detail = new JMSTransactionDetail(xid,
- resourceManager.getTransaction(xid),
- entry.getValue());
+ TransactionDetail detail = new JMSTransactionDetail(xid, resourceManager.getTransaction(xid), entry.getValue());
txDetailListJson.put(detail.toJSON());
}
return txDetailListJson.toString();
}
-
+
public String listPreparedTransactionDetailsAsHTML() throws Exception
{
ResourceManager resourceManager = server.getResourceManager();
Map<Xid, Long> xids = resourceManager.getPreparedTransactionsWithCreationTime();
- if(xids == null || xids.size() == 0)
+ if (xids == null || xids.size() == 0)
{
return "<h3>*** Prepared Transaction Details ***</h3><p>No entry.</p>";
}
@@ -1227,18 +1271,16 @@
return (int)(entry1.getValue() - entry2.getValue());
}
});
-
+
StringBuilder html = new StringBuilder();
html.append("<h3>*** Prepared Transaction Details ***</h3>");
-
+
for (Map.Entry<Xid, Long> entry : xidsSortedByCreationTime)
{
Xid xid = entry.getKey();
- TransactionDetail detail = new JMSTransactionDetail(xid,
- resourceManager.getTransaction(xid),
- entry.getValue());
+ TransactionDetail detail = new JMSTransactionDetail(xid, resourceManager.getTransaction(xid), entry.getValue());
JSONObject txJson = detail.toJSON();
-
+
html.append("<table border=\"1\">");
html.append("<tr><th>creation_time</th>");
html.append("<td>" + txJson.get(TransactionDetail.KEY_CREATION_TIME) + "</td>");
@@ -1256,14 +1298,14 @@
html.append("<table border=\"1\" cellspacing=\"0\" cellpadding=\"0\">");
JSONArray msgs = txJson.getJSONArray(TransactionDetail.KEY_TX_RELATED_MESSAGES);
- for(int i=0; i<msgs.length(); i++)
- {
+ for (int i = 0; i < msgs.length(); i++)
+ {
JSONObject msgJson = msgs.getJSONObject(i);
JSONObject props = msgJson.getJSONObject(TransactionDetail.KEY_MSG_PROPERTIES);
StringBuilder propstr = new StringBuilder();
@SuppressWarnings("unchecked")
Iterator<String> propkeys = props.keys();
- while(propkeys.hasNext())
+ while (propkeys.hasNext())
{
String key = propkeys.next();
propstr.append(key);
@@ -1271,7 +1313,7 @@
propstr.append(props.get(key));
propstr.append(", ");
}
-
+
html.append("<th>operation_type</th>");
html.append("<td>" + msgJson.get(TransactionDetail.KEY_MSG_OP_TYPE) + "</th>");
html.append("<th>message_type</th>");
@@ -1284,12 +1326,10 @@
html.append("</table></td></tr>");
html.append("</table><br/>");
}
-
+
return html.toString();
}
-
-
// Public --------------------------------------------------------
// Private -------------------------------------------------------
@@ -1536,5 +1576,39 @@
return false;
}
}
+
+ private boolean runAfterActive(RunnableException runnable) throws Exception
+ {
+ if (active)
+ {
+ runnable.runException();
+ return true;
+ }
+ else
+ {
+ log.info("Caching command for " + runnable + " since the JMS Server is not active yet");
+ cachedCommands.add(runnable);
+ return false;
+ }
+ }
+
+
+ private abstract class RunnableException implements Runnable
+ {
+ public void run()
+ {
+ try
+ {
+ runException();
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+ }
+
+ public abstract void runException() throws Exception;
+ }
+
}
More information about the hornetq-commits
mailing list