[hornetq-commits] JBoss hornetq SVN: r8542 - in trunk: src/main/org/hornetq/core/persistence/impl/journal and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Dec 3 15:20:29 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-12-03 15:20:29 -0500 (Thu, 03 Dec 2009)
New Revision: 8542

Modified:
   trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
   trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
   trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java
Log:
Fix for JMS Tests

Modified: trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java	2009-12-03 20:10:15 UTC (rev 8541)
+++ trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java	2009-12-03 20:20:29 UTC (rev 8542)
@@ -59,19 +59,18 @@
    private final PostOffice postOffice;
 
    private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
-   
+
    private final StorageManager storageManager;
 
    private MessageCounter counter;
 
    // Static --------------------------------------------------------
 
-   private static String toJSON(Map<String, Object>[] messages)
+   private static String toJSON(final Map<String, Object>[] messages)
    {
       JSONArray array = new JSONArray();
-      for (int i = 0; i < messages.length; i++)
+      for (Map<String, Object> message : messages)
       {
-         Map<String, Object> message = messages[i];
          array.put(new JSONObject(message));
       }
       return array.toString();
@@ -95,7 +94,7 @@
 
    // Public --------------------------------------------------------
 
-   public void setMessageCounter(MessageCounter counter)
+   public void setMessageCounter(final MessageCounter counter)
    {
       this.counter = counter;
    }
@@ -104,7 +103,15 @@
 
    public String getName()
    {
-      return queue.getName().toString();
+      clearIO();
+      try
+      {
+         return queue.getName().toString();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public String getAddress()
@@ -114,123 +121,244 @@
 
    public String getFilter()
    {
-      Filter filter = queue.getFilter();
+      clearIO();
+      try
+      {
+         Filter filter = queue.getFilter();
 
-      return (filter != null) ? filter.getFilterString().toString() : null;
+         return filter != null ? filter.getFilterString().toString() : null;
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public boolean isDurable()
    {
-      return queue.isDurable();
+      clearIO();
+      try
+      {
+         return queue.isDurable();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public boolean isTemporary()
    {
-      return queue.isTemporary();
+      clearIO();
+      try
+      {
+         return queue.isTemporary();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public int getMessageCount()
    {
-      return queue.getMessageCount();
+      clearIO();
+      try
+      {
+         return queue.getMessageCount();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public int getConsumerCount()
    {
-      return queue.getConsumerCount();
+      clearIO();
+      try
+      {
+         return queue.getConsumerCount();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public int getDeliveringCount()
    {
-      return queue.getDeliveringCount();
+      clearIO();
+      try
+      {
+         return queue.getDeliveringCount();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public int getMessagesAdded()
    {
-      return queue.getMessagesAdded();
+      clearIO();
+      try
+      {
+         return queue.getMessagesAdded();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public long getID()
    {
-      return queue.getID();
+      clearIO();
+      try
+      {
+         return queue.getID();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public long getScheduledCount()
    {
-      return queue.getScheduledCount();
+      clearIO();
+      try
+      {
+         return queue.getScheduledCount();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public String getDeadLetterAddress()
    {
-      AddressSettings addressSettings = addressSettingsRepository.getMatch(address);
+      clearIO();
+      try
+      {
+         AddressSettings addressSettings = addressSettingsRepository.getMatch(address);
 
-      if (addressSettings != null && addressSettings.getDeadLetterAddress() != null)
-      {
-         return addressSettings.getDeadLetterAddress().toString();
+         if (addressSettings != null && addressSettings.getDeadLetterAddress() != null)
+         {
+            return addressSettings.getDeadLetterAddress().toString();
+         }
+         else
+         {
+            return null;
+         }
       }
-      else
+      finally
       {
-         return null;
+         blockOnIO();
       }
    }
 
    public void setDeadLetterAddress(final String deadLetterAddress) throws Exception
    {
-      AddressSettings addressSettings = addressSettingsRepository.getMatch(address);
+      clearIO();
+      try
+      {
+         AddressSettings addressSettings = addressSettingsRepository.getMatch(address);
 
-      if (deadLetterAddress != null)
+         if (deadLetterAddress != null)
+         {
+            addressSettings.setDeadLetterAddress(new SimpleString(deadLetterAddress));
+         }
+      }
+      finally
       {
-         addressSettings.setDeadLetterAddress(new SimpleString(deadLetterAddress));
+         blockOnIO();
       }
    }
 
    public String getExpiryAddress()
    {
-      AddressSettings addressSettings = addressSettingsRepository.getMatch(address);
+      clearIO();
+      try
+      {
+         AddressSettings addressSettings = addressSettingsRepository.getMatch(address);
 
-      if (addressSettings != null && addressSettings.getExpiryAddress() != null)
-      {
-         return addressSettings.getExpiryAddress().toString();
+         if (addressSettings != null && addressSettings.getExpiryAddress() != null)
+         {
+            return addressSettings.getExpiryAddress().toString();
+         }
+         else
+         {
+            return null;
+         }
       }
-      else
+      finally
       {
-         return null;
+         blockOnIO();
       }
    }
 
    public void setExpiryAddress(final String expiryAddress) throws Exception
    {
-      AddressSettings addressSettings = addressSettingsRepository.getMatch(address);
+      clearIO();
+      try
+      {
+         AddressSettings addressSettings = addressSettingsRepository.getMatch(address);
 
-      SimpleString sExpiryAddress = new SimpleString(expiryAddress);
+         SimpleString sExpiryAddress = new SimpleString(expiryAddress);
 
-      if (expiryAddress != null)
+         if (expiryAddress != null)
+         {
+            addressSettings.setExpiryAddress(sExpiryAddress);
+         }
+
+         queue.setExpiryAddress(sExpiryAddress);
+      }
+      finally
       {
-         addressSettings.setExpiryAddress(sExpiryAddress);
+         blockOnIO();
       }
-
-      queue.setExpiryAddress(sExpiryAddress);
    }
 
    public Map<String, Object>[] listScheduledMessages() throws Exception
    {
-      List<MessageReference> refs = queue.getScheduledMessages();
-      Map<String, Object>[] messages = new Map[refs.size()];
-      int i = 0;
-      for (MessageReference ref : refs)
+      clearIO();
+      try
       {
-         Message message = ref.getMessage();
-         messages[i++] = message.toMap();
+         List<MessageReference> refs = queue.getScheduledMessages();
+         Map<String, Object>[] messages = new Map[refs.size()];
+         int i = 0;
+         for (MessageReference ref : refs)
+         {
+            Message message = ref.getMessage();
+            messages[i++] = message.toMap();
+         }
+         return messages;
       }
-      return messages;
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public String listScheduledMessagesAsJSON() throws Exception
    {
-      return toJSON(listScheduledMessages());
+      clearIO();
+      try
+      {
+         return toJSON(listScheduledMessages());
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public Map<String, Object>[] listMessages(final String filterStr) throws Exception
    {
+      clearIO();
       try
       {
          Filter filter = FilterImpl.createFilter(filterStr);
@@ -248,22 +376,43 @@
       {
          throw new IllegalStateException(e.getMessage());
       }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
-   public String listMessagesAsJSON(String filter) throws Exception
+   public String listMessagesAsJSON(final String filter) throws Exception
    {
-      return toJSON(listMessages(filter));
+      clearIO();
+      try
+      {
+         return toJSON(listMessages(filter));
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public int countMessages(final String filterStr) throws Exception
    {
-      Filter filter = FilterImpl.createFilter(filterStr);
-      List<MessageReference> refs = queue.list(filter);
-      return refs.size();
+      clearIO();
+      try
+      {
+         Filter filter = FilterImpl.createFilter(filterStr);
+         List<MessageReference> refs = queue.list(filter);
+         return refs.size();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public boolean removeMessage(final long messageID) throws Exception
    {
+      clearIO();
       try
       {
          return queue.deleteReference(messageID);
@@ -272,138 +421,185 @@
       {
          throw new IllegalStateException(e.getMessage());
       }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public int removeMessages(final String filterStr) throws Exception
    {
-      Filter filter = FilterImpl.createFilter(filterStr);
-      
-      int retValue = queue.deleteMatchingReferences(filter);
-      
-      // Waiting on IO otherwise the operation would return before the operation completed
-      storageManager.waitOnOperations();
-      
-      return retValue;
+      clearIO();
+      try
+      {
+         Filter filter = FilterImpl.createFilter(filterStr);
+
+         return queue.deleteMatchingReferences(filter);
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public boolean expireMessage(final long messageID) throws Exception
    {
-      boolean retValue =queue.expireReference(messageID);
-      
-      // Waiting on IO otherwise the operation would return before the operation completed
-      storageManager.waitOnOperations();
-      
-      return retValue;
+      clearIO();
+      try
+      {
+         return queue.expireReference(messageID);
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public int expireMessages(final String filterStr) throws Exception
    {
+      clearIO();
       try
       {
          Filter filter = FilterImpl.createFilter(filterStr);
-         int retValue = queue.expireReferences(filter);
-         
-         // Waiting on IO otherwise the operation would return before the operation completed
-         storageManager.waitOnOperations();
-         
-         return retValue;
+         return queue.expireReferences(filter);
       }
       catch (HornetQException e)
       {
          throw new IllegalStateException(e.getMessage());
       }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public boolean moveMessage(final long messageID, final String otherQueueName) throws Exception
    {
-      Binding binding = postOffice.getBinding(new SimpleString(otherQueueName));
+      clearIO();
+      try
+      {
+         Binding binding = postOffice.getBinding(new SimpleString(otherQueueName));
 
-      if (binding == null)
+         if (binding == null)
+         {
+            throw new IllegalArgumentException("No queue found for " + otherQueueName);
+         }
+
+         return queue.moveReference(messageID, binding.getAddress());
+      }
+      finally
       {
-         throw new IllegalArgumentException("No queue found for " + otherQueueName);
+         blockOnIO();
       }
 
-      boolean retValue = queue.moveReference(messageID, binding.getAddress());
-      
-      // Waiting on IO otherwise the operation would return before the operation completed
-      storageManager.waitOnOperations();
-      
-      return retValue;
    }
 
    public int moveMessages(final String filterStr, final String otherQueueName) throws Exception
    {
-      Filter filter = FilterImpl.createFilter(filterStr);
+      clearIO();
+      try
+      {
+         Filter filter = FilterImpl.createFilter(filterStr);
 
-      Binding binding = postOffice.getBinding(new SimpleString(otherQueueName));
+         Binding binding = postOffice.getBinding(new SimpleString(otherQueueName));
 
-      if (binding == null)
+         if (binding == null)
+         {
+            throw new IllegalArgumentException("No queue found for " + otherQueueName);
+         }
+
+         int retValue = queue.moveReferences(filter, binding.getAddress());
+
+         return retValue;
+      }
+      finally
       {
-         throw new IllegalArgumentException("No queue found for " + otherQueueName);
+         blockOnIO();
       }
 
-      int retValue = queue.moveReferences(filter, binding.getAddress());
-      
-      // Waiting on IO otherwise the operation would return before the operation completed
-      storageManager.waitOnOperations();
-      
-      return retValue;
-      
    }
 
    public int sendMessagesToDeadLetterAddress(final String filterStr) throws Exception
    {
-      Filter filter = FilterImpl.createFilter(filterStr);
+      clearIO();
+      try
+      {
+         Filter filter = FilterImpl.createFilter(filterStr);
 
-      List<MessageReference> refs = queue.list(filter);
+         List<MessageReference> refs = queue.list(filter);
 
-      for (MessageReference ref : refs)
+         for (MessageReference ref : refs)
+         {
+            sendMessageToDeadLetterAddress(ref.getMessage().getMessageID());
+         }
+
+         return refs.size();
+      }
+      finally
       {
-         sendMessageToDeadLetterAddress(ref.getMessage().getMessageID());
+         blockOnIO();
       }
-      
-      // Waiting on IO otherwise the operation would return before the operation completed
-      storageManager.waitOnOperations();
-
-      return refs.size();
    }
 
    public boolean sendMessageToDeadLetterAddress(final long messageID) throws Exception
    {
-      boolean retValue = queue.sendMessageToDeadLetterAddress(messageID);
-      
-      // Waiting on IO otherwise the operation would return before the operation completed
-      storageManager.waitOnOperations();
+      clearIO();
+      try
+      {
 
-      return retValue;
+         boolean retValue = queue.sendMessageToDeadLetterAddress(messageID);
+
+         return retValue;
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
-   public int changeMessagesPriority(String filterStr, int newPriority) throws Exception
+   public int changeMessagesPriority(final String filterStr, final int newPriority) throws Exception
    {
-      Filter filter = FilterImpl.createFilter(filterStr);
+      clearIO();
+      try
+      {
+         Filter filter = FilterImpl.createFilter(filterStr);
 
-      List<MessageReference> refs = queue.list(filter);
+         List<MessageReference> refs = queue.list(filter);
 
-      for (MessageReference ref : refs)
+         for (MessageReference ref : refs)
+         {
+            changeMessagePriority(ref.getMessage().getMessageID(), newPriority);
+         }
+
+         return refs.size();
+      }
+      finally
       {
-         changeMessagePriority(ref.getMessage().getMessageID(), newPriority);
+         blockOnIO();
       }
-
-      return refs.size();
    }
 
    public boolean changeMessagePriority(final long messageID, final int newPriority) throws Exception
    {
-      if (newPriority < 0 || newPriority > 9)
+      clearIO();
+      try
       {
-         throw new IllegalArgumentException("invalid newPriority value: " + newPriority +
-                                            ". It must be between 0 and 9 (both included)");
+         if (newPriority < 0 || newPriority > 9)
+         {
+            throw new IllegalArgumentException("invalid newPriority value: " + newPriority +
+                                               ". It must be between 0 and 9 (both included)");
+         }
+         return queue.changeReferencePriority(messageID, (byte)newPriority);
       }
-      return queue.changeReferencePriority(messageID, (byte)newPriority);
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public String listMessageCounter()
    {
+      clearIO();
       try
       {
          return MessageCounterInfo.toJSon(counter);
@@ -412,41 +608,101 @@
       {
          throw new IllegalStateException(e);
       }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public void resetMessageCounter()
    {
-      counter.resetCounter();
+      clearIO();
+      try
+      {
+         counter.resetCounter();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public String listMessageCounterAsHTML()
    {
-      return MessageCounterHelper.listMessageCounterAsHTML(new MessageCounter[] { counter });
+      clearIO();
+      try
+      {
+         return MessageCounterHelper.listMessageCounterAsHTML(new MessageCounter[] { counter });
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public String listMessageCounterHistory() throws Exception
    {
-      return MessageCounterHelper.listMessageCounterHistory(counter);
+      clearIO();
+      try
+      {
+         return MessageCounterHelper.listMessageCounterHistory(counter);
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public String listMessageCounterHistoryAsHTML()
    {
-      return MessageCounterHelper.listMessageCounterHistoryAsHTML(new MessageCounter[] { counter });
+      clearIO();
+      try
+      {
+         return MessageCounterHelper.listMessageCounterHistoryAsHTML(new MessageCounter[] { counter });
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public void pause()
    {
-      queue.pause();
+      clearIO();
+      try
+      {
+         queue.pause();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public void resume()
    {
-      queue.resume();
+      clearIO();
+      try
+      {
+         queue.resume();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public boolean isPaused() throws Exception
    {
-      return queue.isPaused();
+      clearIO();
+      try
+      {
+         return queue.isPaused();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    // Package protected ---------------------------------------------
@@ -455,5 +711,23 @@
 
    // Private -------------------------------------------------------
 
+   private void clearIO()
+   {
+      storageManager.clearContext();
+   }
+
+   private void blockOnIO()
+   {
+      try
+      {
+         storageManager.waitOnOperations();
+         storageManager.clearContext();
+      }
+      catch (Exception e)
+      {
+         throw new RuntimeException(e.getMessage(), e);
+      }
+   }
+
    // Inner classes -------------------------------------------------
 }

Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java	2009-12-03 20:10:15 UTC (rev 8541)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java	2009-12-03 20:20:29 UTC (rev 8542)
@@ -19,6 +19,7 @@
 import java.util.concurrent.Executor;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.hornetq.core.exception.HornetQException;
 import org.hornetq.core.journal.IOAsyncTask;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.persistence.OperationContext;
@@ -194,16 +195,25 @@
    private void execute(final IOAsyncTask task)
    {
       executorsPending.incrementAndGet();
-      executor.execute(new Runnable()
+      try
       {
-         public void run()
+         executor.execute(new Runnable()
          {
-            // If any IO is done inside the callback, it needs to be done on a new context
-            clearContext();
-            task.done();
-            executorsPending.decrementAndGet();
-         }
-      });
+            public void run()
+            {
+               // If any IO is done inside the callback, it needs to be done on a new context
+               clearContext();
+               task.done();
+               executorsPending.decrementAndGet();
+            }
+         });
+      }
+      catch (Throwable e)
+      {
+         log.warn("Error on executor's submit");
+         executorsPending.decrementAndGet();
+         task.onError(HornetQException.INTERNAL_ERROR, "It wasn't possible to complete IO operation - " + e.getMessage());
+      }
    }
 
    /* (non-Javadoc)

Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java	2009-12-03 20:10:15 UTC (rev 8541)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java	2009-12-03 20:20:29 UTC (rev 8542)
@@ -52,6 +52,7 @@
 import org.hornetq.core.config.TransportConfiguration;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.management.ObjectNameBuilder;
+import org.hornetq.core.management.QueueControl;
 import org.hornetq.core.management.ResourceNames;
 import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.BindingType;
@@ -62,6 +63,7 @@
 import org.hornetq.jms.HornetQQueue;
 import org.hornetq.jms.HornetQTopic;
 import org.hornetq.jms.server.JMSServerManager;
+import org.hornetq.jms.server.config.TopicConfiguration;
 import org.hornetq.jms.server.management.JMSQueueControl;
 import org.hornetq.jms.server.management.TopicControl;
 import org.hornetq.utils.Pair;
@@ -101,7 +103,7 @@
 
    private int serverIndex;
 
-   HornetQBootstrapServer bootstrap;
+   private HornetQBootstrapServer bootstrap;
 
    // Constructors ---------------------------------------------------------------------------------
 
@@ -168,6 +170,7 @@
       bootstrap.shutDown();
       started = false;
       unbindAll();
+      bootstrap = null;
       return true;
    }
 
@@ -429,15 +432,15 @@
 
    public void removeAllMessages(String destination, boolean isQueue) throws Exception
    {
-      SimpleString address = HornetQQueue.createAddressFromName(destination);
-      if (!isQueue)
+      if (isQueue)
       {
-         address = HornetQTopic.createAddressFromName(destination);
-      }
-      Binding binding = getHornetQServer().getPostOffice().getBinding(address);
-      if (binding != null && binding.getType() == BindingType.LOCAL_QUEUE)
+         JMSQueueControl queue = (JMSQueueControl)getHornetQServer().getManagementService().getResource(ResourceNames.JMS_QUEUE + destination);
+         queue.removeMessages(null);
+      } 
+      else
       {
-         ((Queue)binding.getBindable()).deleteAllReferences();
+         TopicControl topic = (TopicControl)getHornetQServer().getManagementService().getResource(ResourceNames.JMS_TOPIC + destination);
+         topic.removeMessages(null);
       }
    }
 
@@ -472,3 +475,4 @@
    // Inner classes --------------------------------------------------------------------------------
 
 }
+



More information about the hornetq-commits mailing list