[hornetq-commits] JBoss hornetq SVN: r8542 - in trunk: src/main/org/hornetq/core/persistence/impl/journal and 1 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Thu Dec 3 15:20:29 EST 2009
Author: clebert.suconic at jboss.com
Date: 2009-12-03 15:20:29 -0500 (Thu, 03 Dec 2009)
New Revision: 8542
Modified:
trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java
Log:
Fix for JMS Tests
Modified: trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2009-12-03 20:10:15 UTC (rev 8541)
+++ trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2009-12-03 20:20:29 UTC (rev 8542)
@@ -59,19 +59,18 @@
private final PostOffice postOffice;
private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
-
+
private final StorageManager storageManager;
private MessageCounter counter;
// Static --------------------------------------------------------
- private static String toJSON(Map<String, Object>[] messages)
+ private static String toJSON(final Map<String, Object>[] messages)
{
JSONArray array = new JSONArray();
- for (int i = 0; i < messages.length; i++)
+ for (Map<String, Object> message : messages)
{
- Map<String, Object> message = messages[i];
array.put(new JSONObject(message));
}
return array.toString();
@@ -95,7 +94,7 @@
// Public --------------------------------------------------------
- public void setMessageCounter(MessageCounter counter)
+ public void setMessageCounter(final MessageCounter counter)
{
this.counter = counter;
}
@@ -104,7 +103,15 @@
public String getName()
{
- return queue.getName().toString();
+ clearIO();
+ try
+ {
+ return queue.getName().toString();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public String getAddress()
@@ -114,123 +121,244 @@
public String getFilter()
{
- Filter filter = queue.getFilter();
+ clearIO();
+ try
+ {
+ Filter filter = queue.getFilter();
- return (filter != null) ? filter.getFilterString().toString() : null;
+ return filter != null ? filter.getFilterString().toString() : null;
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public boolean isDurable()
{
- return queue.isDurable();
+ clearIO();
+ try
+ {
+ return queue.isDurable();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public boolean isTemporary()
{
- return queue.isTemporary();
+ clearIO();
+ try
+ {
+ return queue.isTemporary();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public int getMessageCount()
{
- return queue.getMessageCount();
+ clearIO();
+ try
+ {
+ return queue.getMessageCount();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public int getConsumerCount()
{
- return queue.getConsumerCount();
+ clearIO();
+ try
+ {
+ return queue.getConsumerCount();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public int getDeliveringCount()
{
- return queue.getDeliveringCount();
+ clearIO();
+ try
+ {
+ return queue.getDeliveringCount();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public int getMessagesAdded()
{
- return queue.getMessagesAdded();
+ clearIO();
+ try
+ {
+ return queue.getMessagesAdded();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public long getID()
{
- return queue.getID();
+ clearIO();
+ try
+ {
+ return queue.getID();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public long getScheduledCount()
{
- return queue.getScheduledCount();
+ clearIO();
+ try
+ {
+ return queue.getScheduledCount();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public String getDeadLetterAddress()
{
- AddressSettings addressSettings = addressSettingsRepository.getMatch(address);
+ clearIO();
+ try
+ {
+ AddressSettings addressSettings = addressSettingsRepository.getMatch(address);
- if (addressSettings != null && addressSettings.getDeadLetterAddress() != null)
- {
- return addressSettings.getDeadLetterAddress().toString();
+ if (addressSettings != null && addressSettings.getDeadLetterAddress() != null)
+ {
+ return addressSettings.getDeadLetterAddress().toString();
+ }
+ else
+ {
+ return null;
+ }
}
- else
+ finally
{
- return null;
+ blockOnIO();
}
}
public void setDeadLetterAddress(final String deadLetterAddress) throws Exception
{
- AddressSettings addressSettings = addressSettingsRepository.getMatch(address);
+ clearIO();
+ try
+ {
+ AddressSettings addressSettings = addressSettingsRepository.getMatch(address);
- if (deadLetterAddress != null)
+ if (deadLetterAddress != null)
+ {
+ addressSettings.setDeadLetterAddress(new SimpleString(deadLetterAddress));
+ }
+ }
+ finally
{
- addressSettings.setDeadLetterAddress(new SimpleString(deadLetterAddress));
+ blockOnIO();
}
}
public String getExpiryAddress()
{
- AddressSettings addressSettings = addressSettingsRepository.getMatch(address);
+ clearIO();
+ try
+ {
+ AddressSettings addressSettings = addressSettingsRepository.getMatch(address);
- if (addressSettings != null && addressSettings.getExpiryAddress() != null)
- {
- return addressSettings.getExpiryAddress().toString();
+ if (addressSettings != null && addressSettings.getExpiryAddress() != null)
+ {
+ return addressSettings.getExpiryAddress().toString();
+ }
+ else
+ {
+ return null;
+ }
}
- else
+ finally
{
- return null;
+ blockOnIO();
}
}
public void setExpiryAddress(final String expiryAddress) throws Exception
{
- AddressSettings addressSettings = addressSettingsRepository.getMatch(address);
+ clearIO();
+ try
+ {
+ AddressSettings addressSettings = addressSettingsRepository.getMatch(address);
- SimpleString sExpiryAddress = new SimpleString(expiryAddress);
+ SimpleString sExpiryAddress = new SimpleString(expiryAddress);
- if (expiryAddress != null)
+ if (expiryAddress != null)
+ {
+ addressSettings.setExpiryAddress(sExpiryAddress);
+ }
+
+ queue.setExpiryAddress(sExpiryAddress);
+ }
+ finally
{
- addressSettings.setExpiryAddress(sExpiryAddress);
+ blockOnIO();
}
-
- queue.setExpiryAddress(sExpiryAddress);
}
public Map<String, Object>[] listScheduledMessages() throws Exception
{
- List<MessageReference> refs = queue.getScheduledMessages();
- Map<String, Object>[] messages = new Map[refs.size()];
- int i = 0;
- for (MessageReference ref : refs)
+ clearIO();
+ try
{
- Message message = ref.getMessage();
- messages[i++] = message.toMap();
+ List<MessageReference> refs = queue.getScheduledMessages();
+ Map<String, Object>[] messages = new Map[refs.size()];
+ int i = 0;
+ for (MessageReference ref : refs)
+ {
+ Message message = ref.getMessage();
+ messages[i++] = message.toMap();
+ }
+ return messages;
}
- return messages;
+ finally
+ {
+ blockOnIO();
+ }
}
public String listScheduledMessagesAsJSON() throws Exception
{
- return toJSON(listScheduledMessages());
+ clearIO();
+ try
+ {
+ return toJSON(listScheduledMessages());
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public Map<String, Object>[] listMessages(final String filterStr) throws Exception
{
+ clearIO();
try
{
Filter filter = FilterImpl.createFilter(filterStr);
@@ -248,22 +376,43 @@
{
throw new IllegalStateException(e.getMessage());
}
+ finally
+ {
+ blockOnIO();
+ }
}
- public String listMessagesAsJSON(String filter) throws Exception
+ public String listMessagesAsJSON(final String filter) throws Exception
{
- return toJSON(listMessages(filter));
+ clearIO();
+ try
+ {
+ return toJSON(listMessages(filter));
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public int countMessages(final String filterStr) throws Exception
{
- Filter filter = FilterImpl.createFilter(filterStr);
- List<MessageReference> refs = queue.list(filter);
- return refs.size();
+ clearIO();
+ try
+ {
+ Filter filter = FilterImpl.createFilter(filterStr);
+ List<MessageReference> refs = queue.list(filter);
+ return refs.size();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public boolean removeMessage(final long messageID) throws Exception
{
+ clearIO();
try
{
return queue.deleteReference(messageID);
@@ -272,138 +421,185 @@
{
throw new IllegalStateException(e.getMessage());
}
+ finally
+ {
+ blockOnIO();
+ }
}
public int removeMessages(final String filterStr) throws Exception
{
- Filter filter = FilterImpl.createFilter(filterStr);
-
- int retValue = queue.deleteMatchingReferences(filter);
-
- // Waiting on IO otherwise the operation would return before the operation completed
- storageManager.waitOnOperations();
-
- return retValue;
+ clearIO();
+ try
+ {
+ Filter filter = FilterImpl.createFilter(filterStr);
+
+ return queue.deleteMatchingReferences(filter);
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public boolean expireMessage(final long messageID) throws Exception
{
- boolean retValue =queue.expireReference(messageID);
-
- // Waiting on IO otherwise the operation would return before the operation completed
- storageManager.waitOnOperations();
-
- return retValue;
+ clearIO();
+ try
+ {
+ return queue.expireReference(messageID);
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public int expireMessages(final String filterStr) throws Exception
{
+ clearIO();
try
{
Filter filter = FilterImpl.createFilter(filterStr);
- int retValue = queue.expireReferences(filter);
-
- // Waiting on IO otherwise the operation would return before the operation completed
- storageManager.waitOnOperations();
-
- return retValue;
+ return queue.expireReferences(filter);
}
catch (HornetQException e)
{
throw new IllegalStateException(e.getMessage());
}
+ finally
+ {
+ blockOnIO();
+ }
}
public boolean moveMessage(final long messageID, final String otherQueueName) throws Exception
{
- Binding binding = postOffice.getBinding(new SimpleString(otherQueueName));
+ clearIO();
+ try
+ {
+ Binding binding = postOffice.getBinding(new SimpleString(otherQueueName));
- if (binding == null)
+ if (binding == null)
+ {
+ throw new IllegalArgumentException("No queue found for " + otherQueueName);
+ }
+
+ return queue.moveReference(messageID, binding.getAddress());
+ }
+ finally
{
- throw new IllegalArgumentException("No queue found for " + otherQueueName);
+ blockOnIO();
}
- boolean retValue = queue.moveReference(messageID, binding.getAddress());
-
- // Waiting on IO otherwise the operation would return before the operation completed
- storageManager.waitOnOperations();
-
- return retValue;
}
public int moveMessages(final String filterStr, final String otherQueueName) throws Exception
{
- Filter filter = FilterImpl.createFilter(filterStr);
+ clearIO();
+ try
+ {
+ Filter filter = FilterImpl.createFilter(filterStr);
- Binding binding = postOffice.getBinding(new SimpleString(otherQueueName));
+ Binding binding = postOffice.getBinding(new SimpleString(otherQueueName));
- if (binding == null)
+ if (binding == null)
+ {
+ throw new IllegalArgumentException("No queue found for " + otherQueueName);
+ }
+
+ int retValue = queue.moveReferences(filter, binding.getAddress());
+
+ return retValue;
+ }
+ finally
{
- throw new IllegalArgumentException("No queue found for " + otherQueueName);
+ blockOnIO();
}
- int retValue = queue.moveReferences(filter, binding.getAddress());
-
- // Waiting on IO otherwise the operation would return before the operation completed
- storageManager.waitOnOperations();
-
- return retValue;
-
}
public int sendMessagesToDeadLetterAddress(final String filterStr) throws Exception
{
- Filter filter = FilterImpl.createFilter(filterStr);
+ clearIO();
+ try
+ {
+ Filter filter = FilterImpl.createFilter(filterStr);
- List<MessageReference> refs = queue.list(filter);
+ List<MessageReference> refs = queue.list(filter);
- for (MessageReference ref : refs)
+ for (MessageReference ref : refs)
+ {
+ sendMessageToDeadLetterAddress(ref.getMessage().getMessageID());
+ }
+
+ return refs.size();
+ }
+ finally
{
- sendMessageToDeadLetterAddress(ref.getMessage().getMessageID());
+ blockOnIO();
}
-
- // Waiting on IO otherwise the operation would return before the operation completed
- storageManager.waitOnOperations();
-
- return refs.size();
}
public boolean sendMessageToDeadLetterAddress(final long messageID) throws Exception
{
- boolean retValue = queue.sendMessageToDeadLetterAddress(messageID);
-
- // Waiting on IO otherwise the operation would return before the operation completed
- storageManager.waitOnOperations();
+ clearIO();
+ try
+ {
- return retValue;
+ boolean retValue = queue.sendMessageToDeadLetterAddress(messageID);
+
+ return retValue;
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
- public int changeMessagesPriority(String filterStr, int newPriority) throws Exception
+ public int changeMessagesPriority(final String filterStr, final int newPriority) throws Exception
{
- Filter filter = FilterImpl.createFilter(filterStr);
+ clearIO();
+ try
+ {
+ Filter filter = FilterImpl.createFilter(filterStr);
- List<MessageReference> refs = queue.list(filter);
+ List<MessageReference> refs = queue.list(filter);
- for (MessageReference ref : refs)
+ for (MessageReference ref : refs)
+ {
+ changeMessagePriority(ref.getMessage().getMessageID(), newPriority);
+ }
+
+ return refs.size();
+ }
+ finally
{
- changeMessagePriority(ref.getMessage().getMessageID(), newPriority);
+ blockOnIO();
}
-
- return refs.size();
}
public boolean changeMessagePriority(final long messageID, final int newPriority) throws Exception
{
- if (newPriority < 0 || newPriority > 9)
+ clearIO();
+ try
{
- throw new IllegalArgumentException("invalid newPriority value: " + newPriority +
- ". It must be between 0 and 9 (both included)");
+ if (newPriority < 0 || newPriority > 9)
+ {
+ throw new IllegalArgumentException("invalid newPriority value: " + newPriority +
+ ". It must be between 0 and 9 (both included)");
+ }
+ return queue.changeReferencePriority(messageID, (byte)newPriority);
}
- return queue.changeReferencePriority(messageID, (byte)newPriority);
+ finally
+ {
+ blockOnIO();
+ }
}
public String listMessageCounter()
{
+ clearIO();
try
{
return MessageCounterInfo.toJSon(counter);
@@ -412,41 +608,101 @@
{
throw new IllegalStateException(e);
}
+ finally
+ {
+ blockOnIO();
+ }
}
public void resetMessageCounter()
{
- counter.resetCounter();
+ clearIO();
+ try
+ {
+ counter.resetCounter();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public String listMessageCounterAsHTML()
{
- return MessageCounterHelper.listMessageCounterAsHTML(new MessageCounter[] { counter });
+ clearIO();
+ try
+ {
+ return MessageCounterHelper.listMessageCounterAsHTML(new MessageCounter[] { counter });
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public String listMessageCounterHistory() throws Exception
{
- return MessageCounterHelper.listMessageCounterHistory(counter);
+ clearIO();
+ try
+ {
+ return MessageCounterHelper.listMessageCounterHistory(counter);
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public String listMessageCounterHistoryAsHTML()
{
- return MessageCounterHelper.listMessageCounterHistoryAsHTML(new MessageCounter[] { counter });
+ clearIO();
+ try
+ {
+ return MessageCounterHelper.listMessageCounterHistoryAsHTML(new MessageCounter[] { counter });
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public void pause()
{
- queue.pause();
+ clearIO();
+ try
+ {
+ queue.pause();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public void resume()
{
- queue.resume();
+ clearIO();
+ try
+ {
+ queue.resume();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public boolean isPaused() throws Exception
{
- return queue.isPaused();
+ clearIO();
+ try
+ {
+ return queue.isPaused();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
// Package protected ---------------------------------------------
@@ -455,5 +711,23 @@
// Private -------------------------------------------------------
+ private void clearIO()
+ {
+ storageManager.clearContext();
+ }
+
+ private void blockOnIO()
+ {
+ try
+ {
+ storageManager.waitOnOperations();
+ storageManager.clearContext();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+
// Inner classes -------------------------------------------------
}
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2009-12-03 20:10:15 UTC (rev 8541)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2009-12-03 20:20:29 UTC (rev 8542)
@@ -19,6 +19,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
+import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.persistence.OperationContext;
@@ -194,16 +195,25 @@
private void execute(final IOAsyncTask task)
{
executorsPending.incrementAndGet();
- executor.execute(new Runnable()
+ try
{
- public void run()
+ executor.execute(new Runnable()
{
- // If any IO is done inside the callback, it needs to be done on a new context
- clearContext();
- task.done();
- executorsPending.decrementAndGet();
- }
- });
+ public void run()
+ {
+ // If any IO is done inside the callback, it needs to be done on a new context
+ clearContext();
+ task.done();
+ executorsPending.decrementAndGet();
+ }
+ });
+ }
+ catch (Throwable e)
+ {
+ log.warn("Error on executor's submit");
+ executorsPending.decrementAndGet();
+ task.onError(HornetQException.INTERNAL_ERROR, "It wasn't possible to complete IO operation - " + e.getMessage());
+ }
}
/* (non-Javadoc)
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java 2009-12-03 20:10:15 UTC (rev 8541)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java 2009-12-03 20:20:29 UTC (rev 8542)
@@ -52,6 +52,7 @@
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.management.ObjectNameBuilder;
+import org.hornetq.core.management.QueueControl;
import org.hornetq.core.management.ResourceNames;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.BindingType;
@@ -62,6 +63,7 @@
import org.hornetq.jms.HornetQQueue;
import org.hornetq.jms.HornetQTopic;
import org.hornetq.jms.server.JMSServerManager;
+import org.hornetq.jms.server.config.TopicConfiguration;
import org.hornetq.jms.server.management.JMSQueueControl;
import org.hornetq.jms.server.management.TopicControl;
import org.hornetq.utils.Pair;
@@ -101,7 +103,7 @@
private int serverIndex;
- HornetQBootstrapServer bootstrap;
+ private HornetQBootstrapServer bootstrap;
// Constructors ---------------------------------------------------------------------------------
@@ -168,6 +170,7 @@
bootstrap.shutDown();
started = false;
unbindAll();
+ bootstrap = null;
return true;
}
@@ -429,15 +432,15 @@
public void removeAllMessages(String destination, boolean isQueue) throws Exception
{
- SimpleString address = HornetQQueue.createAddressFromName(destination);
- if (!isQueue)
+ if (isQueue)
{
- address = HornetQTopic.createAddressFromName(destination);
- }
- Binding binding = getHornetQServer().getPostOffice().getBinding(address);
- if (binding != null && binding.getType() == BindingType.LOCAL_QUEUE)
+ JMSQueueControl queue = (JMSQueueControl)getHornetQServer().getManagementService().getResource(ResourceNames.JMS_QUEUE + destination);
+ queue.removeMessages(null);
+ }
+ else
{
- ((Queue)binding.getBindable()).deleteAllReferences();
+ TopicControl topic = (TopicControl)getHornetQServer().getManagementService().getResource(ResourceNames.JMS_TOPIC + destination);
+ topic.removeMessages(null);
}
}
@@ -472,3 +475,4 @@
// Inner classes --------------------------------------------------------------------------------
}
+
More information about the hornetq-commits
mailing list