[hornetq-commits] JBoss hornetq SVN: r8544 - in trunk: src/main/org/hornetq/core/management/impl and 6 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Dec 3 18:54:23 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-12-03 18:54:22 -0500 (Thu, 03 Dec 2009)
New Revision: 8544

Added:
   trunk/src/main/org/hornetq/core/management/impl/AbstractControl.java
   trunk/tests/src/org/hornetq/tests/unit/core/persistence/impl/OperationContextUnitTest.java
Modified:
   trunk/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java
   trunk/src/main/org/hornetq/core/management/impl/AcceptorControlImpl.java
   trunk/src/main/org/hornetq/core/management/impl/AddressControlImpl.java
   trunk/src/main/org/hornetq/core/management/impl/BridgeControlImpl.java
   trunk/src/main/org/hornetq/core/management/impl/BroadcastGroupControlImpl.java
   trunk/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java
   trunk/src/main/org/hornetq/core/management/impl/DiscoveryGroupControlImpl.java
   trunk/src/main/org/hornetq/core/management/impl/DivertControlImpl.java
   trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
   trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java
   trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
   trunk/src/main/org/hornetq/core/persistence/OperationContext.java
   trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
   trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
   trunk/tests/src/org/hornetq/tests/integration/client/OrderTest.java
   trunk/tests/src/org/hornetq/tests/integration/management/ManagementServiceImplTest.java
Log:
Mainly making management controllers to block on the storage manager

Modified: trunk/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java	2009-12-03 23:46:53 UTC (rev 8543)
+++ trunk/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java	2009-12-03 23:54:22 UTC (rev 8544)
@@ -60,12 +60,20 @@
       {
          throw new HornetQException(errorCode, errorMessage);
       }
+
       return;
    }
    
    public boolean waitCompletion(final long timeout) throws Exception
    {
-      return latch.await(timeout, TimeUnit.MILLISECONDS);
+      boolean retValue = latch.await(timeout, TimeUnit.MILLISECONDS);
+
+      if (errorMessage != null)
+      {
+         throw new HornetQException(errorCode, errorMessage);
+      }
+      
+      return retValue;
    }
 
    /* (non-Javadoc)

Added: trunk/src/main/org/hornetq/core/management/impl/AbstractControl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/AbstractControl.java	                        (rev 0)
+++ trunk/src/main/org/hornetq/core/management/impl/AbstractControl.java	2009-12-03 23:54:22 UTC (rev 8544)
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.management.impl;
+
+import javax.management.NotCompliantMBeanException;
+import javax.management.StandardMBean;
+
+import org.hornetq.core.persistence.StorageManager;
+
+/**
+ * A AbstractControl
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public abstract class AbstractControl extends StandardMBean
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   protected final StorageManager storageManager;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   public AbstractControl(Class<?> clazz, StorageManager storageManager) throws NotCompliantMBeanException
+   {
+      super(clazz);
+      this.storageManager = storageManager;
+   }
+
+   // Public --------------------------------------------------------
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   protected void clearIO()
+   {
+      // the storage manager could be null on the backup on certain components
+      if (storageManager != null)
+      {
+         storageManager.clearContext();
+      }
+   }
+
+   protected void blockOnIO()
+   {
+      // the storage manager could be null on the backup on certain components
+      if (storageManager != null)
+      {
+         try
+         {
+            storageManager.waitOnOperations();
+            storageManager.clearContext();
+         }
+         catch (Exception e)
+         {
+            throw new RuntimeException(e.getMessage(), e);
+         }
+      }
+
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Modified: trunk/src/main/org/hornetq/core/management/impl/AcceptorControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/AcceptorControlImpl.java	2009-12-03 23:46:53 UTC (rev 8543)
+++ trunk/src/main/org/hornetq/core/management/impl/AcceptorControlImpl.java	2009-12-03 23:54:22 UTC (rev 8544)
@@ -19,6 +19,7 @@
 
 import org.hornetq.core.config.TransportConfiguration;
 import org.hornetq.core.management.AcceptorControl;
+import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.remoting.spi.Acceptor;
 
 /**
@@ -28,7 +29,7 @@
  * 
  * Created 11 dec. 2008 17:09:04
  */
-public class AcceptorControlImpl extends StandardMBean implements AcceptorControl
+public class AcceptorControlImpl extends AbstractControl implements AcceptorControl
 {
 
    // Constants -----------------------------------------------------
@@ -43,10 +44,11 @@
 
    // Constructors --------------------------------------------------
 
-   public AcceptorControlImpl(final Acceptor acceptor, final TransportConfiguration configuration)
-      throws Exception
+   public AcceptorControlImpl(final Acceptor acceptor,
+                              final StorageManager storageManager,
+                              final TransportConfiguration configuration) throws Exception
    {
-      super(AcceptorControl.class);
+      super(AcceptorControl.class, storageManager);
       this.acceptor = acceptor;
       this.configuration = configuration;
    }
@@ -55,32 +57,80 @@
 
    public String getFactoryClassName()
    {
-      return configuration.getFactoryClassName();
+      clearIO();
+      try
+      {
+         return configuration.getFactoryClassName();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public String getName()
    {
-      return configuration.getName();
+      clearIO();
+      try
+      {
+         return configuration.getName();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public Map<String, Object> getParameters()
    {
-      return configuration.getParams();
+      clearIO();
+      try
+      {
+         return configuration.getParams();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public boolean isStarted()
    {
-      return acceptor.isStarted();
+      clearIO();
+      try
+      {
+         return acceptor.isStarted();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public void start() throws Exception
    {
-      acceptor.start();
+      clearIO();
+      try
+      {
+         acceptor.start();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
-   
+
    public void stop() throws Exception
    {
-      acceptor.stop();
+      clearIO();
+      try
+      {
+         acceptor.stop();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    // Public --------------------------------------------------------

Modified: trunk/src/main/org/hornetq/core/management/impl/AddressControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/AddressControlImpl.java	2009-12-03 23:46:53 UTC (rev 8543)
+++ trunk/src/main/org/hornetq/core/management/impl/AddressControlImpl.java	2009-12-03 23:54:22 UTC (rev 8544)
@@ -21,6 +21,7 @@
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.management.AddressControl;
 import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.postoffice.Binding;
 import org.hornetq.core.postoffice.Bindings;
 import org.hornetq.core.postoffice.PostOffice;
@@ -37,7 +38,7 @@
  * @version <tt>$Revision$</tt>
  * 
  */
-public class AddressControlImpl extends StandardMBean implements AddressControl
+public class AddressControlImpl extends AbstractControl implements AddressControl
 {
 
    // Constants -----------------------------------------------------
@@ -49,7 +50,7 @@
    private final SimpleString address;
 
    private final PostOffice postOffice;
-   
+
    private final PagingManager pagingManager;
 
    private final HierarchicalRepository<Set<Role>> securityRepository;
@@ -60,11 +61,11 @@
 
    public AddressControlImpl(final SimpleString address,
                              final PostOffice postOffice,
-                             final PagingManager pagingManager, 
-                             final HierarchicalRepository<Set<Role>> securityRepository)
-      throws Exception
+                             final PagingManager pagingManager,
+                             final StorageManager storageManager,
+                             final HierarchicalRepository<Set<Role>> securityRepository) throws Exception
    {
-      super(AddressControl.class);
+      super(AddressControl.class, storageManager);
       this.address = address;
       this.postOffice = postOffice;
       this.pagingManager = pagingManager;
@@ -82,6 +83,7 @@
 
    public String[] getQueueNames() throws Exception
    {
+      clearIO();
       try
       {
          Bindings bindings = postOffice.getBindingsForAddress(address);
@@ -97,49 +99,85 @@
       {
          throw new IllegalStateException(t.getMessage());
       }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public Object[] getRoles() throws Exception
    {
-      Set<Role> roles = securityRepository.getMatch(address.toString());
+      clearIO();
+      try
+      {
+         Set<Role> roles = securityRepository.getMatch(address.toString());
 
-      Object[] objRoles = new Object[roles.size()];
+         Object[] objRoles = new Object[roles.size()];
 
-      int i = 0;
-      for (Role role : roles)
+         int i = 0;
+         for (Role role : roles)
+         {
+            objRoles[i++] = new Object[] { role.getName(),
+                                          CheckType.SEND.hasRole(role),
+                                          CheckType.CONSUME.hasRole(role),
+                                          CheckType.CREATE_DURABLE_QUEUE.hasRole(role),
+                                          CheckType.DELETE_DURABLE_QUEUE.hasRole(role),
+                                          CheckType.CREATE_NON_DURABLE_QUEUE.hasRole(role),
+                                          CheckType.DELETE_NON_DURABLE_QUEUE.hasRole(role),
+                                          CheckType.MANAGE.hasRole(role) };
+         }
+         return objRoles;
+      }
+      finally
       {
-         objRoles[i++] = new Object[] { role.getName(),
-                                       CheckType.SEND.hasRole(role),
-                                       CheckType.CONSUME.hasRole(role),
-                                       CheckType.CREATE_DURABLE_QUEUE.hasRole(role),
-                                       CheckType.DELETE_DURABLE_QUEUE.hasRole(role),
-                                       CheckType.CREATE_NON_DURABLE_QUEUE.hasRole(role),
-                                       CheckType.DELETE_NON_DURABLE_QUEUE.hasRole(role),
-                                       CheckType.MANAGE.hasRole(role) };
+         blockOnIO();
       }
-      return objRoles;
    }
 
    public String getRolesAsJSON() throws Exception
    {
-      JSONArray json = new JSONArray();
-      Set<Role> roles = securityRepository.getMatch(address.toString());
+      clearIO();
+      try
+      {
+         JSONArray json = new JSONArray();
+         Set<Role> roles = securityRepository.getMatch(address.toString());
 
-      for (Role role : roles)
+         for (Role role : roles)
+         {
+            json.put(new JSONObject(role));
+         }
+         return json.toString();
+      }
+      finally
       {
-         json.put(new JSONObject(role));
+         blockOnIO();
       }
-      return json.toString();
    }
-   
+
    public long getNumberOfBytesPerPage() throws Exception
    {
-      return pagingManager.getPageStore(address).getPageSizeBytes();
+      clearIO();
+      try
+      {
+         return pagingManager.getPageStore(address).getPageSizeBytes();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public int getNumberOfPages() throws Exception
    {
-      return pagingManager.getPageStore(address).getNumberOfPages();     
+      clearIO();
+      try
+      {
+         return pagingManager.getPageStore(address).getNumberOfPages();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public synchronized void addRole(final String name,
@@ -151,43 +189,59 @@
                                     final boolean deleteNonDurableQueue,
                                     final boolean manage) throws Exception
    {
-      Set<Role> roles = securityRepository.getMatch(address.toString());
-      Role newRole = new Role(name,
-                              send,
-                              consume,
-                              createDurableQueue,
-                              deleteDurableQueue,
-                              createNonDurableQueue,
-                              deleteNonDurableQueue,
-                              manage);
-      boolean added = roles.add(newRole);
-      if (!added)
+      clearIO();
+      try
       {
-         throw new IllegalArgumentException("Role " + name + " already exists");
+         Set<Role> roles = securityRepository.getMatch(address.toString());
+         Role newRole = new Role(name,
+                                 send,
+                                 consume,
+                                 createDurableQueue,
+                                 deleteDurableQueue,
+                                 createNonDurableQueue,
+                                 deleteNonDurableQueue,
+                                 manage);
+         boolean added = roles.add(newRole);
+         if (!added)
+         {
+            throw new IllegalArgumentException("Role " + name + " already exists");
+         }
+         securityRepository.addMatch(address.toString(), roles);
       }
-      securityRepository.addMatch(address.toString(), roles);
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public synchronized void removeRole(final String role) throws Exception
    {
-      Set<Role> roles = securityRepository.getMatch(address.toString());
-      Iterator<Role> it = roles.iterator();
-      boolean removed = false;
-      while (it.hasNext())
+      clearIO();
+      try
       {
-         Role r = it.next();
-         if (r.getName().equals(role))
+         Set<Role> roles = securityRepository.getMatch(address.toString());
+         Iterator<Role> it = roles.iterator();
+         boolean removed = false;
+         while (it.hasNext())
          {
-            it.remove();
-            removed = true;
-            break;
+            Role r = it.next();
+            if (r.getName().equals(role))
+            {
+               it.remove();
+               removed = true;
+               break;
+            }
          }
+         if (!removed)
+         {
+            throw new IllegalArgumentException("Role " + role + " does not exist");
+         }
+         securityRepository.addMatch(address.toString(), roles);
       }
-      if (!removed)
+      finally
       {
-         throw new IllegalArgumentException("Role " + role + " does not exist");
+         blockOnIO();
       }
-      securityRepository.addMatch(address.toString(), roles);
    }
 
    // Package protected ---------------------------------------------

Modified: trunk/src/main/org/hornetq/core/management/impl/BridgeControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/BridgeControlImpl.java	2009-12-03 23:46:53 UTC (rev 8543)
+++ trunk/src/main/org/hornetq/core/management/impl/BridgeControlImpl.java	2009-12-03 23:54:22 UTC (rev 8544)
@@ -13,10 +13,9 @@
 
 package org.hornetq.core.management.impl;
 
-import javax.management.StandardMBean;
-
 import org.hornetq.core.config.cluster.BridgeConfiguration;
 import org.hornetq.core.management.BridgeControl;
+import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.server.cluster.Bridge;
 
 /**
@@ -26,7 +25,7 @@
  * 
  * Created 11 dec. 2008 17:09:04
  */
-public class BridgeControlImpl extends StandardMBean implements BridgeControl
+public class BridgeControlImpl extends AbstractControl implements BridgeControl
 {
 
    // Constants -----------------------------------------------------
@@ -41,9 +40,11 @@
 
    // Constructors --------------------------------------------------
 
-   public BridgeControlImpl(final Bridge bridge, final BridgeConfiguration configuration) throws Exception
+   public BridgeControlImpl(final Bridge bridge,
+                            final StorageManager storageManager,
+                            final BridgeConfiguration configuration) throws Exception
    {
-      super(BridgeControl.class);
+      super(BridgeControl.class, storageManager);
       this.bridge = bridge;
       this.configuration = configuration;
    }
@@ -52,82 +53,202 @@
 
    public String[] getConnectorPair() throws Exception
    {
-      String[] pair = new String[2];
+      clearIO();
+      try
+      {
+         String[] pair = new String[2];
 
-      pair[0] = configuration.getConnectorPair().a;
-      pair[1] = configuration.getConnectorPair().b != null ? configuration.getConnectorPair().b : null;
+         pair[0] = configuration.getConnectorPair().a;
+         pair[1] = configuration.getConnectorPair().b != null ? configuration.getConnectorPair().b : null;
 
-      return pair;
+         return pair;
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public String getForwardingAddress()
    {
-      return configuration.getForwardingAddress();
+      clearIO();
+      try
+      {
+         return configuration.getForwardingAddress();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public String getQueueName()
    {
-      return configuration.getQueueName();
+      clearIO();
+      try
+      {
+         return configuration.getQueueName();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public String getDiscoveryGroupName()
    {
-      return configuration.getDiscoveryGroupName();
+      clearIO();
+      try
+      {
+         return configuration.getDiscoveryGroupName();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public String getFilterString()
    {
-      return configuration.getFilterString();
+      clearIO();
+      try
+      {
+         return configuration.getFilterString();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public int getReconnectAttempts()
    {
-      return configuration.getReconnectAttempts();
+      clearIO();
+      try
+      {
+         return configuration.getReconnectAttempts();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public boolean isFailoverOnServerShutdown()
    {
-      return configuration.isFailoverOnServerShutdown();
+      clearIO();
+      try
+      {
+         return configuration.isFailoverOnServerShutdown();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public String getName()
    {
-      return configuration.getName();
+      clearIO();
+      try
+      {
+         return configuration.getName();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public long getRetryInterval()
    {
-      return configuration.getRetryInterval();
+      clearIO();
+      try
+      {
+         return configuration.getRetryInterval();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public double getRetryIntervalMultiplier()
    {
-      return configuration.getRetryIntervalMultiplier();
+      clearIO();
+      try
+      {
+         return configuration.getRetryIntervalMultiplier();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public String getTransformerClassName()
    {
-      return configuration.getTransformerClassName();
+      clearIO();
+      try
+      {
+         return configuration.getTransformerClassName();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public boolean isStarted()
    {
-      return bridge.isStarted();
+      clearIO();
+      try
+      {
+         return bridge.isStarted();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public boolean isUseDuplicateDetection()
    {
-      return configuration.isUseDuplicateDetection();
+      clearIO();
+      try
+      {
+         return configuration.isUseDuplicateDetection();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public void start() throws Exception
    {
-      bridge.start();
+      clearIO();
+      try
+      {
+         bridge.start();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public void stop() throws Exception
    {
-      bridge.stop();
+      clearIO();
+      try
+      {
+         bridge.stop();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    // Public --------------------------------------------------------

Modified: trunk/src/main/org/hornetq/core/management/impl/BroadcastGroupControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/BroadcastGroupControlImpl.java	2009-12-03 23:46:53 UTC (rev 8543)
+++ trunk/src/main/org/hornetq/core/management/impl/BroadcastGroupControlImpl.java	2009-12-03 23:54:22 UTC (rev 8544)
@@ -17,6 +17,7 @@
 
 import org.hornetq.core.config.cluster.BroadcastGroupConfiguration;
 import org.hornetq.core.management.BroadcastGroupControl;
+import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.server.cluster.BroadcastGroup;
 import org.hornetq.utils.Pair;
 import org.hornetq.utils.json.JSONArray;
@@ -29,7 +30,7 @@
  * 
  * Created 11 dec. 2008 17:09:04
  */
-public class BroadcastGroupControlImpl extends StandardMBean implements BroadcastGroupControl
+public class BroadcastGroupControlImpl extends AbstractControl implements BroadcastGroupControl
 {
 
    // Constants -----------------------------------------------------
@@ -44,88 +45,169 @@
 
    // Constructors --------------------------------------------------
 
-   public BroadcastGroupControlImpl(final BroadcastGroup broadcastGroup, final BroadcastGroupConfiguration configuration)
-      throws Exception
+   public BroadcastGroupControlImpl(final BroadcastGroup broadcastGroup,
+                                    final StorageManager storageManager,
+                                    final BroadcastGroupConfiguration configuration) throws Exception
    {
-      super(BroadcastGroupControl.class);
+      super(BroadcastGroupControl.class, storageManager);
       this.broadcastGroup = broadcastGroup;
       this.configuration = configuration;
    }
 
    // BroadcastGroupControlMBean implementation ---------------------
-   
+
    public String getName()
    {
-      return configuration.getName();
+      clearIO();
+      try
+      {
+         return configuration.getName();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public long getBroadcastPeriod()
    {
-      return configuration.getBroadcastPeriod();
+      clearIO();
+      try
+      {
+         return configuration.getBroadcastPeriod();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public Object[] getConnectorPairs()
    {
-      Object[] ret = new Object[configuration.getConnectorInfos().size()];
-      
-      int i = 0;
-      for (Pair<String, String> pair: configuration.getConnectorInfos())
+      clearIO();
+      try
       {
-         String[] opair = new String[2];
-         
-         opair[0] = pair.a;
-         opair[1] = pair.b != null ? pair.b : null;
-         
-         ret[i++] = opair;
+         Object[] ret = new Object[configuration.getConnectorInfos().size()];
+
+         int i = 0;
+         for (Pair<String, String> pair : configuration.getConnectorInfos())
+         {
+            String[] opair = new String[2];
+
+            opair[0] = pair.a;
+            opair[1] = pair.b != null ? pair.b : null;
+
+            ret[i++] = opair;
+         }
+
+         return ret;
       }
-      
-      return ret;      
+      finally
+      {
+         blockOnIO();
+      }
    }
-   
+
    public String getConnectorPairsAsJSON() throws Exception
    {
-      JSONArray array = new JSONArray();
-      
-      for (Pair<String, String> pair: configuration.getConnectorInfos())
+      clearIO();
+      try
       {
-         JSONObject p = new JSONObject();
-         p.put("a", pair.a);
-         p.put("b", pair.b);
-         array.put(p);
+         JSONArray array = new JSONArray();
+
+         for (Pair<String, String> pair : configuration.getConnectorInfos())
+         {
+            JSONObject p = new JSONObject();
+            p.put("a", pair.a);
+            p.put("b", pair.b);
+            array.put(p);
+         }
+         return array.toString();
       }
-      return array.toString();
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public String getGroupAddress()
    {
-      return configuration.getGroupAddress();
+      clearIO();
+      try
+      {
+         return configuration.getGroupAddress();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public int getGroupPort()
    {
-      return configuration.getGroupPort();
+      clearIO();
+      try
+      {
+         return configuration.getGroupPort();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public int getLocalBindPort()
    {
-      return configuration.getLocalBindPort();
+      clearIO();
+      try
+      {
+         return configuration.getLocalBindPort();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    // MessagingComponentControlMBean implementation -----------------
 
    public boolean isStarted()
    {
-      return broadcastGroup.isStarted();
+      clearIO();
+      try
+      {
+         return broadcastGroup.isStarted();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public void start() throws Exception
    {
-      broadcastGroup.start();
+      clearIO();
+      try
+      {
+         broadcastGroup.start();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public void stop() throws Exception
    {
-      broadcastGroup.stop();
+      clearIO();
+      try
+      {
+         broadcastGroup.stop();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    // Public --------------------------------------------------------

Modified: trunk/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java	2009-12-03 23:46:53 UTC (rev 8543)
+++ trunk/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java	2009-12-03 23:54:22 UTC (rev 8544)
@@ -20,6 +20,7 @@
 
 import org.hornetq.core.config.cluster.ClusterConnectionConfiguration;
 import org.hornetq.core.management.ClusterConnectionControl;
+import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.server.cluster.ClusterConnection;
 import org.hornetq.utils.Pair;
 import org.hornetq.utils.json.JSONArray;
@@ -30,7 +31,7 @@
  *
  * @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
  */
-public class ClusterConnectionControlImpl extends StandardMBean implements ClusterConnectionControl
+public class ClusterConnectionControlImpl extends AbstractControl implements ClusterConnectionControl
 {
 
    // Constants -----------------------------------------------------
@@ -46,9 +47,10 @@
    // Constructors --------------------------------------------------
 
    public ClusterConnectionControlImpl(final ClusterConnection clusterConnection,
+                                       final StorageManager storageManager,
                                        ClusterConnectionConfiguration configuration) throws Exception
    {
-      super(ClusterConnectionControl.class);
+      super(ClusterConnectionControl.class, storageManager);
       this.clusterConnection = clusterConnection;
       this.configuration = configuration;
    }
@@ -57,108 +59,225 @@
 
    public String getAddress()
    {
-      return configuration.getAddress();
+      clearIO();
+      try
+      {
+         return configuration.getAddress();
+      }
+      finally
+      {
+         blockOnIO();
+      }
+
    }
 
    public String getDiscoveryGroupName()
    {
-      return configuration.getDiscoveryGroupName();
+      clearIO();
+      try
+      {
+         return configuration.getDiscoveryGroupName();
+      }
+      finally
+      {
+         blockOnIO();
+      }
+
    }
 
    public int getMaxHops()
    {
-      return configuration.getMaxHops();
+      clearIO();
+      try
+      {
+         return configuration.getMaxHops();
+      }
+      finally
+      {
+         blockOnIO();
+      }
+
    }
 
    public String getName()
    {
-      return configuration.getName();
+      clearIO();
+      try
+      {
+         return configuration.getName();
+      }
+      finally
+      {
+         blockOnIO();
+      }
+
    }
 
    public long getRetryInterval()
    {
-      return configuration.getRetryInterval();
+      clearIO();
+      try
+      {
+         return configuration.getRetryInterval();
+      }
+      finally
+      {
+         blockOnIO();
+      }
+
    }
-   
+
    public String getNodeID()
    {
-      return clusterConnection.getNodeID();
+      clearIO();
+      try
+      {
+         return clusterConnection.getNodeID();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public Object[] getStaticConnectorNamePairs()
    {
-      List<Pair<String, String>> pairs = configuration.getStaticConnectorNamePairs();
-      
-      if (pairs == null)
+      clearIO();
+      try
       {
-         return null;
-      }
-         
-      Object[] ret = new Object[pairs.size()];
+         List<Pair<String, String>> pairs = configuration.getStaticConnectorNamePairs();
 
-      int i = 0;
-      for (Pair<String, String> pair : configuration.getStaticConnectorNamePairs())
-      {
-         String[] opair = new String[2];
+         if (pairs == null)
+         {
+            return null;
+         }
 
-         opair[0] = pair.a;
-         opair[1] = pair.b != null ? pair.b : null;
+         Object[] ret = new Object[pairs.size()];
 
-         ret[i++] = opair;
+         int i = 0;
+         for (Pair<String, String> pair : configuration.getStaticConnectorNamePairs())
+         {
+            String[] opair = new String[2];
+
+            opair[0] = pair.a;
+            opair[1] = pair.b != null ? pair.b : null;
+
+            ret[i++] = opair;
+         }
+
+         return ret;
       }
-
-      return ret;
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public String getStaticConnectorNamePairsAsJSON() throws Exception
    {
-      List<Pair<String, String>> pairs = configuration.getStaticConnectorNamePairs();
-      
-      if (pairs == null)
+      clearIO();
+      try
       {
-         return null;
+         List<Pair<String, String>> pairs = configuration.getStaticConnectorNamePairs();
+
+         if (pairs == null)
+         {
+            return null;
+         }
+
+         JSONArray array = new JSONArray();
+
+         for (Pair<String, String> pair : pairs)
+         {
+            JSONObject p = new JSONObject();
+            p.put("a", pair.a);
+            p.put("b", pair.b);
+            array.put(p);
+         }
+         return array.toString();
       }
-      
-      JSONArray array = new JSONArray();
-
-      for (Pair<String, String> pair : pairs)
+      finally
       {
-         JSONObject p = new JSONObject();
-         p.put("a", pair.a);
-         p.put("b", pair.b);
-         array.put(p);
+         blockOnIO();
       }
-      return array.toString();
    }
 
    public boolean isDuplicateDetection()
    {
-      return configuration.isDuplicateDetection();
+      clearIO();
+      try
+      {
+         return configuration.isDuplicateDetection();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public boolean isForwardWhenNoConsumers()
    {
-      return configuration.isForwardWhenNoConsumers();
+      clearIO();
+      try
+      {
+         return configuration.isForwardWhenNoConsumers();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public Map<String, String> getNodes() throws Exception
    {
-      return clusterConnection.getNodes();
+      clearIO();
+      try
+      {
+         return clusterConnection.getNodes();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
-   
+
    public boolean isStarted()
    {
-      return clusterConnection.isStarted();
+      clearIO();
+      try
+      {
+         return clusterConnection.isStarted();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public void start() throws Exception
    {
-      clusterConnection.start();
+      clearIO();
+      try
+      {
+         clusterConnection.start();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public void stop() throws Exception
    {
-      clusterConnection.stop();
+      clearIO();
+      try
+      {
+         clusterConnection.stop();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    // Public --------------------------------------------------------

Modified: trunk/src/main/org/hornetq/core/management/impl/DiscoveryGroupControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/DiscoveryGroupControlImpl.java	2009-12-03 23:46:53 UTC (rev 8543)
+++ trunk/src/main/org/hornetq/core/management/impl/DiscoveryGroupControlImpl.java	2009-12-03 23:54:22 UTC (rev 8544)
@@ -18,6 +18,7 @@
 import org.hornetq.core.cluster.DiscoveryGroup;
 import org.hornetq.core.config.cluster.DiscoveryGroupConfiguration;
 import org.hornetq.core.management.DiscoveryGroupControl;
+import org.hornetq.core.persistence.StorageManager;
 
 /**
  * A AcceptorControl
@@ -26,7 +27,7 @@
  * 
  * Created 11 dec. 2008 17:09:04
  */
-public class DiscoveryGroupControlImpl extends StandardMBean implements DiscoveryGroupControl
+public class DiscoveryGroupControlImpl extends AbstractControl implements DiscoveryGroupControl
 {
 
    // Constants -----------------------------------------------------
@@ -41,10 +42,11 @@
 
    // Constructors --------------------------------------------------
 
-   public DiscoveryGroupControlImpl(final DiscoveryGroup acceptor, final DiscoveryGroupConfiguration configuration)
-      throws Exception
+   public DiscoveryGroupControlImpl(final DiscoveryGroup acceptor,
+                                    final StorageManager storageManager,
+                                    final DiscoveryGroupConfiguration configuration) throws Exception
    {
-      super(DiscoveryGroupControl.class);
+      super(DiscoveryGroupControl.class, storageManager);
       this.discoveryGroup = acceptor;
       this.configuration = configuration;
    }
@@ -53,37 +55,99 @@
 
    public String getName()
    {
-      return configuration.getName();
+      clearIO();
+      try
+      {
+         return configuration.getName();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public String getGroupAddress()
    {
-      return configuration.getGroupAddress();
+      clearIO();
+      try
+      {
+         return configuration.getGroupAddress();
+      }
+      finally
+      {
+         blockOnIO();
+      }
+
    }
 
    public int getGroupPort()
    {
-      return configuration.getGroupPort();
+      clearIO();
+      try
+      {
+         return configuration.getGroupPort();
+      }
+      finally
+      {
+         blockOnIO();
+      }
+
    }
 
    public long getRefreshTimeout()
    {
-      return configuration.getRefreshTimeout();
+      clearIO();
+      try
+      {
+         return configuration.getRefreshTimeout();
+      }
+      finally
+      {
+         blockOnIO();
+      }
+
    }
 
    public boolean isStarted()
    {
-      return discoveryGroup.isStarted();
+      clearIO();
+      try
+      {
+         return discoveryGroup.isStarted();
+      }
+      finally
+      {
+         blockOnIO();
+      }
+
    }
 
    public void start() throws Exception
    {
-      discoveryGroup.start();
+      clearIO();
+      try
+      {
+         discoveryGroup.start();
+      }
+      finally
+      {
+         blockOnIO();
+      }
+
    }
 
    public void stop() throws Exception
    {
-      discoveryGroup.stop();
+      clearIO();
+      try
+      {
+         discoveryGroup.stop();
+      }
+      finally
+      {
+         blockOnIO();
+      }
+
    }
 
    // Public --------------------------------------------------------

Modified: trunk/src/main/org/hornetq/core/management/impl/DivertControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/DivertControlImpl.java	2009-12-03 23:46:53 UTC (rev 8543)
+++ trunk/src/main/org/hornetq/core/management/impl/DivertControlImpl.java	2009-12-03 23:54:22 UTC (rev 8544)
@@ -17,6 +17,7 @@
 
 import org.hornetq.core.config.cluster.DivertConfiguration;
 import org.hornetq.core.management.DivertControl;
+import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.server.Divert;
 
 /**
@@ -26,7 +27,7 @@
  * 
  * Created 11 dec. 2008 17:09:04
  */
-public class DivertControlImpl extends StandardMBean implements DivertControl
+public class DivertControlImpl extends AbstractControl implements DivertControl
 {
 
    // Constants -----------------------------------------------------
@@ -43,47 +44,104 @@
 
    // DivertControlMBean implementation ---------------------------
 
-   public DivertControlImpl(final Divert divert, final DivertConfiguration configuration)
-      throws Exception
+   public DivertControlImpl(final Divert divert,
+                            final StorageManager storageManager,
+                            final DivertConfiguration configuration) throws Exception
    {
-      super(DivertControl.class);
+      super(DivertControl.class, storageManager);
       this.divert = divert;
       this.configuration = configuration;
    }
 
    public String getAddress()
    {
-      return configuration.getAddress();
+      clearIO();
+      try
+      {
+         return configuration.getAddress();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public String getFilter()
    {
-      return configuration.getFilterString();
+      clearIO();
+      try
+      {
+         return configuration.getFilterString();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public String getForwardingAddress()
    {
-      return configuration.getForwardingAddress();
+      clearIO();
+      try
+      {
+         return configuration.getForwardingAddress();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public String getRoutingName()
    {
-      return divert.getRoutingName().toString();
+      clearIO();
+      try
+      {
+         return divert.getRoutingName().toString();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public String getTransformerClassName()
    {
-      return configuration.getTransformerClassName();
+      clearIO();
+      try
+      {
+         return configuration.getTransformerClassName();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public String getUniqueName()
    {
-      return divert.getUniqueName().toString();
+      clearIO();
+      try
+      {
+         return divert.getUniqueName().toString();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public boolean isExclusive()
    {
-      return divert.isExclusive();
+      clearIO();
+      try
+      {
+         return divert.isExclusive();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    // Public --------------------------------------------------------

Modified: trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java	2009-12-03 23:46:53 UTC (rev 8543)
+++ trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java	2009-12-03 23:54:22 UTC (rev 8544)
@@ -30,7 +30,6 @@
 import javax.management.NotificationEmitter;
 import javax.management.NotificationFilter;
 import javax.management.NotificationListener;
-import javax.management.StandardMBean;
 import javax.transaction.xa.Xid;
 
 import org.hornetq.core.config.Configuration;
@@ -43,6 +42,7 @@
 import org.hornetq.core.management.QueueControl;
 import org.hornetq.core.messagecounter.MessageCounterManager;
 import org.hornetq.core.messagecounter.impl.MessageCounterManagerImpl;
+import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.postoffice.PostOffice;
 import org.hornetq.core.remoting.RemotingConnection;
 import org.hornetq.core.remoting.server.RemotingService;
@@ -62,7 +62,7 @@
  * @version <tt>$Revision$</tt>
  * 
  */
-public class HornetQServerControlImpl extends StandardMBean implements HornetQServerControl, NotificationEmitter
+public class HornetQServerControlImpl extends AbstractControl implements HornetQServerControl, NotificationEmitter
 {
    // Constants -----------------------------------------------------
 
@@ -94,14 +94,15 @@
                                    final RemotingService remotingService,
                                    final HornetQServer messagingServer,
                                    final MessageCounterManager messageCounterManager,
+                                   final StorageManager storageManager,
                                    final NotificationBroadcasterSupport broadcaster) throws Exception
    {
-      super(HornetQServerControl.class);
+      super(HornetQServerControl.class, storageManager);
       this.postOffice = postOffice;
       this.configuration = configuration;
       this.resourceManager = resourceManager;
       this.remotingService = remotingService;
-      this.server = messagingServer;
+      server = messagingServer;
       this.messageCounterManager = messageCounterManager;
       this.broadcaster = broadcaster;
    }
@@ -117,468 +118,940 @@
 
    public boolean isStarted()
    {
-      return server.isStarted();
+      clearIO();
+      try
+      {
+         return server.isStarted();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public String getVersion()
    {
-      return server.getVersion().getFullVersion();
+      clearIO();
+      try
+      {
+         return server.getVersion().getFullVersion();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public boolean isBackup()
    {
-      return configuration.isBackup();
+      clearIO();
+      try
+      {
+         return configuration.isBackup();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public boolean isSharedStore()
    {
-      return configuration.isSharedStore();
+      clearIO();
+      try
+      {
+         return configuration.isSharedStore();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public String getBackupConnectorName()
    {
-      return configuration.getBackupConnectorName();
+      clearIO();
+      try
+      {
+         return configuration.getBackupConnectorName();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public String getBindingsDirectory()
    {
-      return configuration.getBindingsDirectory();
+      clearIO();
+      try
+      {
+         return configuration.getBindingsDirectory();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public String[] getInterceptorClassNames()
    {
-      return configuration.getInterceptorClassNames().toArray(new String[configuration.getInterceptorClassNames()
-                                                                                      .size()]);
+      clearIO();
+      try
+      {
+         return configuration.getInterceptorClassNames().toArray(new String[configuration.getInterceptorClassNames()
+                                                                                         .size()]);
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public int getJournalBufferSize()
    {
-      return configuration.getJournalType() == JournalType.ASYNCIO ? configuration.getJournalBufferSize_AIO()
-                                                                  : configuration.getJournalBufferSize_NIO();
+      clearIO();
+      try
+      {
+         return configuration.getJournalType() == JournalType.ASYNCIO ? configuration.getJournalBufferSize_AIO()
+                                                                     : configuration.getJournalBufferSize_NIO();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public int getJournalBufferTimeout()
    {
-      return configuration.getJournalType() == JournalType.ASYNCIO ? configuration.getJournalBufferTimeout_AIO()
-                                                                  : configuration.getJournalBufferTimeout_NIO();
+      clearIO();
+      try
+      {
+         return configuration.getJournalType() == JournalType.ASYNCIO ? configuration.getJournalBufferTimeout_AIO()
+                                                                     : configuration.getJournalBufferTimeout_NIO();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public int getJournalMaxIO()
    {
-      return configuration.getJournalType() == JournalType.ASYNCIO ? configuration.getJournalMaxIO_AIO()
-                                                                  : configuration.getJournalMaxIO_NIO();
+      clearIO();
+      try
+      {
+         return configuration.getJournalType() == JournalType.ASYNCIO ? configuration.getJournalMaxIO_AIO()
+                                                                     : configuration.getJournalMaxIO_NIO();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public String getJournalDirectory()
    {
-      return configuration.getJournalDirectory();
+      clearIO();
+      try
+      {
+         return configuration.getJournalDirectory();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public int getJournalFileSize()
    {
-      return configuration.getJournalFileSize();
+      clearIO();
+      try
+      {
+         return configuration.getJournalFileSize();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public int getJournalMinFiles()
    {
-      return configuration.getJournalMinFiles();
+      clearIO();
+      try
+      {
+         return configuration.getJournalMinFiles();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public int getJournalCompactMinFiles()
    {
-      return configuration.getJournalCompactMinFiles();
+      clearIO();
+      try
+      {
+         return configuration.getJournalCompactMinFiles();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public int getJournalCompactPercentage()
    {
-      return configuration.getJournalCompactPercentage();
+      clearIO();
+      try
+      {
+         return configuration.getJournalCompactPercentage();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public boolean isPersistenceEnabled()
    {
-      return configuration.isPersistenceEnabled();
+      clearIO();
+      try
+      {
+         return configuration.isPersistenceEnabled();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public String getJournalType()
    {
-      return configuration.getJournalType().toString();
+      clearIO();
+      try
+      {
+         return configuration.getJournalType().toString();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public String getPagingDirectory()
    {
-      return configuration.getPagingDirectory();
+      clearIO();
+      try
+      {
+         return configuration.getPagingDirectory();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public int getScheduledThreadPoolMaxSize()
    {
-      return configuration.getScheduledThreadPoolMaxSize();
+      clearIO();
+      try
+      {
+         return configuration.getScheduledThreadPoolMaxSize();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public int getThreadPoolMaxSize()
    {
-      return configuration.getThreadPoolMaxSize();
+      clearIO();
+      try
+      {
+         return configuration.getThreadPoolMaxSize();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public long getSecurityInvalidationInterval()
    {
-      return configuration.getSecurityInvalidationInterval();
+      clearIO();
+      try
+      {
+         return configuration.getSecurityInvalidationInterval();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public boolean isClustered()
    {
-      return configuration.isClustered();
+      clearIO();
+      try
+      {
+         return configuration.isClustered();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public boolean isCreateBindingsDir()
    {
-      return configuration.isCreateBindingsDir();
+      clearIO();
+      try
+      {
+         return configuration.isCreateBindingsDir();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public boolean isCreateJournalDir()
    {
-      return configuration.isCreateJournalDir();
+      clearIO();
+      try
+      {
+         return configuration.isCreateJournalDir();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public boolean isJournalSyncNonTransactional()
    {
-      return configuration.isJournalSyncNonTransactional();
+      clearIO();
+      try
+      {
+         return configuration.isJournalSyncNonTransactional();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public boolean isJournalSyncTransactional()
    {
-      return configuration.isJournalSyncTransactional();
+      clearIO();
+      try
+      {
+         return configuration.isJournalSyncTransactional();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public boolean isSecurityEnabled()
    {
-      return configuration.isSecurityEnabled();
+      clearIO();
+      try
+      {
+         return configuration.isSecurityEnabled();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
-   public void deployQueue(final String address, final String name, String filterString) throws Exception
+   public void deployQueue(final String address, final String name, final String filterString) throws Exception
    {
-      server.deployQueue(new SimpleString(address), new SimpleString(name), new SimpleString(filterString), true, false);
+      clearIO();
+      try
+      {
+         server.deployQueue(new SimpleString(address),
+                            new SimpleString(name),
+                            new SimpleString(filterString),
+                            true,
+                            false);
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public void deployQueue(final String address, final String name, final String filterStr, final boolean durable) throws Exception
    {
       SimpleString filter = filterStr == null ? null : new SimpleString(filterStr);
+      clearIO();
+      try
+      {
 
-      server.deployQueue(new SimpleString(address), new SimpleString(name), filter, durable, false);
+         server.deployQueue(new SimpleString(address), new SimpleString(name), filter, durable, false);
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public void createQueue(final String address, final String name) throws Exception
    {
-      server.createQueue(new SimpleString(address), new SimpleString(name), null, true, false);
+      clearIO();
+      try
+      {
+         server.createQueue(new SimpleString(address), new SimpleString(name), null, true, false);
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public void createQueue(final String address, final String name, final boolean durable) throws Exception
    {
-      server.createQueue(new SimpleString(address), new SimpleString(name), null, durable, false);
+      clearIO();
+      try
+      {
+         server.createQueue(new SimpleString(address), new SimpleString(name), null, durable, false);
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public void createQueue(final String address, final String name, final String filterStr, final boolean durable) throws Exception
    {
-      SimpleString filter = null;
-      if (filterStr != null && !filterStr.trim().equals(""))
+      clearIO();
+      try
       {
-         filter = new SimpleString(filterStr);
+         SimpleString filter = null;
+         if (filterStr != null && !filterStr.trim().equals(""))
+         {
+            filter = new SimpleString(filterStr);
+         }
+
+         server.createQueue(new SimpleString(address), new SimpleString(name), filter, durable, false);
       }
-
-      server.createQueue(new SimpleString(address), new SimpleString(name), filter, durable, false);
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public String[] getQueueNames()
    {
-      Object[] queues = server.getManagementService().getResources(QueueControl.class);
-      String[] names = new String[queues.length];
-      for (int i = 0; i < queues.length; i++)
+      clearIO();
+      try
       {
-         QueueControl queue = (QueueControl)queues[i];
-         names[i] = queue.getName();
+         Object[] queues = server.getManagementService().getResources(QueueControl.class);
+         String[] names = new String[queues.length];
+         for (int i = 0; i < queues.length; i++)
+         {
+            QueueControl queue = (QueueControl)queues[i];
+            names[i] = queue.getName();
+         }
+
+         return names;
       }
-
-      return names;
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public String[] getAddressNames()
    {
-      Object[] addresses = server.getManagementService().getResources(AddressControl.class);
-      String[] names = new String[addresses.length];
-      for (int i = 0; i < addresses.length; i++)
+      clearIO();
+      try
       {
-         AddressControl address = (AddressControl)addresses[i];
-         names[i] = address.getAddress();
+         Object[] addresses = server.getManagementService().getResources(AddressControl.class);
+         String[] names = new String[addresses.length];
+         for (int i = 0; i < addresses.length; i++)
+         {
+            AddressControl address = (AddressControl)addresses[i];
+            names[i] = address.getAddress();
+         }
+
+         return names;
       }
-
-      return names;
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public void destroyQueue(final String name) throws Exception
    {
-      SimpleString queueName = new SimpleString(name);
+      clearIO();
+      try
+      {
+         SimpleString queueName = new SimpleString(name);
 
-      server.destroyQueue(queueName, null);
+         server.destroyQueue(queueName, null);
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public int getConnectionCount()
    {
-      return server.getConnectionCount();
+      clearIO();
+      try
+      {
+         return server.getConnectionCount();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public void enableMessageCounters()
    {
-      setMessageCounterEnabled(true);
+      clearIO();
+      try
+      {
+         setMessageCounterEnabled(true);
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public void disableMessageCounters()
    {
-      setMessageCounterEnabled(false);
+      clearIO();
+      try
+      {
+         setMessageCounterEnabled(false);
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public void resetAllMessageCounters()
    {
-      messageCounterManager.resetAllCounters();
+      clearIO();
+      try
+      {
+         messageCounterManager.resetAllCounters();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public void resetAllMessageCounterHistories()
    {
-      messageCounterManager.resetAllCounterHistories();
+      clearIO();
+      try
+      {
+         messageCounterManager.resetAllCounterHistories();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public boolean isMessageCounterEnabled()
    {
-      return configuration.isMessageCounterEnabled();
+      clearIO();
+      try
+      {
+         return configuration.isMessageCounterEnabled();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public synchronized long getMessageCounterSamplePeriod()
    {
-      return messageCounterManager.getSamplePeriod();
+      clearIO();
+      try
+      {
+         return messageCounterManager.getSamplePeriod();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public synchronized void setMessageCounterSamplePeriod(final long newPeriod)
    {
-      if (newPeriod < MessageCounterManagerImpl.MIN_SAMPLE_PERIOD)
+      clearIO();
+      try
       {
-         throw new IllegalArgumentException("Cannot set MessageCounterSamplePeriod < " + MessageCounterManagerImpl.MIN_SAMPLE_PERIOD +
-                                            " ms");
+         if (newPeriod < MessageCounterManagerImpl.MIN_SAMPLE_PERIOD)
+         {
+            throw new IllegalArgumentException("Cannot set MessageCounterSamplePeriod < " + MessageCounterManagerImpl.MIN_SAMPLE_PERIOD +
+                                               " ms");
+         }
+
+         if (messageCounterManager != null && newPeriod != messageCounterManager.getSamplePeriod())
+         {
+            messageCounterManager.reschedule(newPeriod);
+         }
       }
-
-      if (messageCounterManager != null && newPeriod != messageCounterManager.getSamplePeriod())
+      finally
       {
-         messageCounterManager.reschedule(newPeriod);
+         blockOnIO();
       }
    }
 
    public int getMessageCounterMaxDayCount()
    {
-      return messageCounterManager.getMaxDayCount();
+      clearIO();
+      try
+      {
+         return messageCounterManager.getMaxDayCount();
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public void setMessageCounterMaxDayCount(final int count)
    {
-      if (count <= 0)
+      clearIO();
+      try
       {
-         throw new IllegalArgumentException("invalid value: count must be greater than 0");
+         if (count <= 0)
+         {
+            throw new IllegalArgumentException("invalid value: count must be greater than 0");
+         }
+         messageCounterManager.setMaxDayCount(count);
       }
-      messageCounterManager.setMaxDayCount(count);
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public String[] listPreparedTransactions()
    {
-      DateFormat dateFormat = DateFormat.getDateTimeInstance(DateFormat.SHORT, DateFormat.MEDIUM);
+      clearIO();
+      try
+      {
+         DateFormat dateFormat = DateFormat.getDateTimeInstance(DateFormat.SHORT, DateFormat.MEDIUM);
 
-      Map<Xid, Long> xids = resourceManager.getPreparedTransactionsWithCreationTime();
-      ArrayList<Entry<Xid, Long>> xidsSortedByCreationTime = new ArrayList<Map.Entry<Xid, Long>>(xids.entrySet());
-      Collections.sort(xidsSortedByCreationTime, new Comparator<Entry<Xid, Long>>()
-      {
-         public int compare(Entry<Xid, Long> entry1, Entry<Xid, Long> entry2)
+         Map<Xid, Long> xids = resourceManager.getPreparedTransactionsWithCreationTime();
+         ArrayList<Entry<Xid, Long>> xidsSortedByCreationTime = new ArrayList<Map.Entry<Xid, Long>>(xids.entrySet());
+         Collections.sort(xidsSortedByCreationTime, new Comparator<Entry<Xid, Long>>()
          {
-            // sort by creation time, oldest first
-            return (int)(entry1.getValue() - entry2.getValue());
+            public int compare(final Entry<Xid, Long> entry1, final Entry<Xid, Long> entry2)
+            {
+               // sort by creation time, oldest first
+               return (int)(entry1.getValue() - entry2.getValue());
+            }
+         });
+         String[] s = new String[xidsSortedByCreationTime.size()];
+         int i = 0;
+         for (Map.Entry<Xid, Long> entry : xidsSortedByCreationTime)
+         {
+            Date creation = new Date(entry.getValue());
+            Xid xid = entry.getKey();
+            s[i++] = dateFormat.format(creation) + " base64: " + XidImpl.toBase64String(xid) + " " + xid.toString();
          }
-      });
-      String[] s = new String[xidsSortedByCreationTime.size()];
-      int i = 0;
-      for (Map.Entry<Xid, Long> entry : xidsSortedByCreationTime)
+         return s;
+      }
+      finally
       {
-         Date creation = new Date(entry.getValue());
-         Xid xid = entry.getKey();
-         s[i++] = dateFormat.format(creation) + " base64: " + XidImpl.toBase64String(xid) + " " + xid.toString();
+         blockOnIO();
       }
-      return s;
    }
 
    public String[] listHeuristicCommittedTransactions()
    {
-      List<Xid> xids = resourceManager.getHeuristicCommittedTransactions();
-      String[] s = new String[xids.size()];
-      int i = 0;
-      for (Xid xid : xids)
+      clearIO();
+      try
       {
-         s[i++] = XidImpl.toBase64String(xid);
+         List<Xid> xids = resourceManager.getHeuristicCommittedTransactions();
+         String[] s = new String[xids.size()];
+         int i = 0;
+         for (Xid xid : xids)
+         {
+            s[i++] = XidImpl.toBase64String(xid);
+         }
+         return s;
       }
-      return s;
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public String[] listHeuristicRolledBackTransactions()
    {
-      List<Xid> xids = resourceManager.getHeuristicRolledbackTransactions();
-      String[] s = new String[xids.size()];
-      int i = 0;
-      for (Xid xid : xids)
+      clearIO();
+      try
       {
-         s[i++] = XidImpl.toBase64String(xid);
+         List<Xid> xids = resourceManager.getHeuristicRolledbackTransactions();
+         String[] s = new String[xids.size()];
+         int i = 0;
+         for (Xid xid : xids)
+         {
+            s[i++] = XidImpl.toBase64String(xid);
+         }
+         return s;
       }
-      return s;
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public synchronized boolean commitPreparedTransaction(final String transactionAsBase64) throws Exception
    {
-      List<Xid> xids = resourceManager.getPreparedTransactions();
+      clearIO();
+      try
+      {
+         List<Xid> xids = resourceManager.getPreparedTransactions();
 
-      for (Xid xid : xids)
-      {
-         if (XidImpl.toBase64String(xid).equals(transactionAsBase64))
+         for (Xid xid : xids)
          {
-            Transaction transaction = resourceManager.removeTransaction(xid);
-            transaction.commit(false);
-            server.getStorageManager().waitOnOperations();
-            long recordID = server.getStorageManager().storeHeuristicCompletion(xid, true);
-            resourceManager.putHeuristicCompletion(recordID, xid, true);
-            return true;
+            if (XidImpl.toBase64String(xid).equals(transactionAsBase64))
+            {
+               Transaction transaction = resourceManager.removeTransaction(xid);
+               transaction.commit(false);
+               long recordID = server.getStorageManager().storeHeuristicCompletion(xid, true);
+               storageManager.waitOnOperations();
+               resourceManager.putHeuristicCompletion(recordID, xid, true);
+               return true;
+            }
          }
+         return false;
       }
-      return false;
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public synchronized boolean rollbackPreparedTransaction(final String transactionAsBase64) throws Exception
    {
-      List<Xid> xids = resourceManager.getPreparedTransactions();
 
-      for (Xid xid : xids)
+      clearIO();
+      try
       {
-         if (XidImpl.toBase64String(xid).equals(transactionAsBase64))
+
+         List<Xid> xids = resourceManager.getPreparedTransactions();
+
+         for (Xid xid : xids)
          {
-            Transaction transaction = resourceManager.removeTransaction(xid);
-            transaction.rollback();
-            server.getStorageManager().waitOnOperations();
-            long recordID = server.getStorageManager().storeHeuristicCompletion(xid, false);
-            resourceManager.putHeuristicCompletion(recordID, xid, false);
-            return true;
+            if (XidImpl.toBase64String(xid).equals(transactionAsBase64))
+            {
+               Transaction transaction = resourceManager.removeTransaction(xid);
+               transaction.rollback();
+               long recordID = server.getStorageManager().storeHeuristicCompletion(xid, false);
+               server.getStorageManager().waitOnOperations();
+               resourceManager.putHeuristicCompletion(recordID, xid, false);
+               return true;
+            }
          }
+         return false;
       }
-      return false;
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public String[] listRemoteAddresses()
    {
-      Set<RemotingConnection> connections = remotingService.getConnections();
+      clearIO();
+      try
+      {
+         Set<RemotingConnection> connections = remotingService.getConnections();
 
-      String[] remoteAddresses = new String[connections.size()];
-      int i = 0;
-      for (RemotingConnection connection : connections)
+         String[] remoteAddresses = new String[connections.size()];
+         int i = 0;
+         for (RemotingConnection connection : connections)
+         {
+            remoteAddresses[i++] = connection.getRemoteAddress();
+         }
+         return remoteAddresses;
+      }
+      finally
       {
-         remoteAddresses[i++] = connection.getRemoteAddress();
+         blockOnIO();
       }
-      return remoteAddresses;
+
    }
 
    public String[] listRemoteAddresses(final String ipAddress)
    {
-      Set<RemotingConnection> connections = remotingService.getConnections();
-      List<String> remoteConnections = new ArrayList<String>();
-      for (RemotingConnection connection : connections)
+      clearIO();
+      try
       {
-         String remoteAddress = connection.getRemoteAddress();
-         if (remoteAddress.contains(ipAddress))
+         Set<RemotingConnection> connections = remotingService.getConnections();
+         List<String> remoteConnections = new ArrayList<String>();
+         for (RemotingConnection connection : connections)
          {
-            remoteConnections.add(connection.getRemoteAddress());
+            String remoteAddress = connection.getRemoteAddress();
+            if (remoteAddress.contains(ipAddress))
+            {
+               remoteConnections.add(connection.getRemoteAddress());
+            }
          }
+         return remoteConnections.toArray(new String[remoteConnections.size()]);
       }
-      return (String[])remoteConnections.toArray(new String[remoteConnections.size()]);
+      finally
+      {
+         blockOnIO();
+      }
+
    }
 
    public synchronized boolean closeConnectionsForAddress(final String ipAddress)
    {
-      boolean closed = false;
-      Set<RemotingConnection> connections = remotingService.getConnections();
-      for (RemotingConnection connection : connections)
+      clearIO();
+      try
       {
-         String remoteAddress = connection.getRemoteAddress();
-         if (remoteAddress.contains(ipAddress))
+         boolean closed = false;
+         Set<RemotingConnection> connections = remotingService.getConnections();
+         for (RemotingConnection connection : connections)
          {
-            remotingService.removeConnection(connection.getID());
-            connection.fail(new HornetQException(HornetQException.INTERNAL_ERROR, "connections for " + ipAddress +
-                                                                                  " closed by management"));
-            closed = true;
+            String remoteAddress = connection.getRemoteAddress();
+            if (remoteAddress.contains(ipAddress))
+            {
+               remotingService.removeConnection(connection.getID());
+               connection.fail(new HornetQException(HornetQException.INTERNAL_ERROR, "connections for " + ipAddress +
+                                                                                     " closed by management"));
+               closed = true;
+            }
          }
+
+         return closed;
       }
+      finally
+      {
+         blockOnIO();
+      }
 
-      return closed;
    }
 
    public String[] listConnectionIDs()
    {
-      Set<RemotingConnection> connections = remotingService.getConnections();
-      String[] connectionIDs = new String[connections.size()];
-      int i = 0;
-      for (RemotingConnection connection : connections)
+      clearIO();
+      try
       {
-         connectionIDs[i++] = connection.getID().toString();
+         Set<RemotingConnection> connections = remotingService.getConnections();
+         String[] connectionIDs = new String[connections.size()];
+         int i = 0;
+         for (RemotingConnection connection : connections)
+         {
+            connectionIDs[i++] = connection.getID().toString();
+         }
+         return connectionIDs;
       }
-      return connectionIDs;
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public String[] listSessions(final String connectionID)
    {
-      List<ServerSession> sessions = server.getSessions(connectionID);
-      String[] sessionIDs = new String[sessions.size()];
-      int i = 0;
-      for (ServerSession serverSession : sessions)
+      clearIO();
+      try
       {
-         sessionIDs[i++] = serverSession.getName();
+         List<ServerSession> sessions = server.getSessions(connectionID);
+         String[] sessionIDs = new String[sessions.size()];
+         int i = 0;
+         for (ServerSession serverSession : sessions)
+         {
+            sessionIDs[i++] = serverSession.getName();
+         }
+         return sessionIDs;
       }
-      return sessionIDs;
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public Object[] getConnectors() throws Exception
    {
-      Collection<TransportConfiguration> connectorConfigurations = configuration.getConnectorConfigurations().values();
+      clearIO();
+      try
+      {
+         Collection<TransportConfiguration> connectorConfigurations = configuration.getConnectorConfigurations()
+                                                                                   .values();
 
-      Object[] ret = new Object[connectorConfigurations.size()];
+         Object[] ret = new Object[connectorConfigurations.size()];
 
-      int i = 0;
-      for (TransportConfiguration config : connectorConfigurations)
-      {
-         Object[] tc = new Object[3];
+         int i = 0;
+         for (TransportConfiguration config : connectorConfigurations)
+         {
+            Object[] tc = new Object[3];
 
-         tc[0] = config.getName();
-         tc[1] = config.getFactoryClassName();
-         tc[2] = config.getParams();
+            tc[0] = config.getName();
+            tc[1] = config.getFactoryClassName();
+            tc[2] = config.getParams();
 
-         ret[i++] = tc;
+            ret[i++] = tc;
+         }
+
+         return ret;
       }
-
-      return ret;
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public String getConnectorsAsJSON() throws Exception
    {
-      JSONArray array = new JSONArray();
+      clearIO();
+      try
+      {
+         JSONArray array = new JSONArray();
 
-      for (TransportConfiguration config : configuration.getConnectorConfigurations().values())
+         for (TransportConfiguration config : configuration.getConnectorConfigurations().values())
+         {
+            array.put(new JSONObject(config));
+         }
+
+         return array.toString();
+      }
+      finally
       {
-         array.put(new JSONObject(config));
+         blockOnIO();
       }
-
-      return array.toString();
    }
 
    public void sendQueueInfoToQueue(final String queueName, final String address) throws Exception
    {
-      postOffice.sendQueueInfoToQueue(new SimpleString(queueName), new SimpleString(address));
-      // blocking on IO. Otherwise the method would return before the operation was finished
-      server.getStorageManager().waitOnOperations();
+      clearIO();
+      try
+      {
+         postOffice.sendQueueInfoToQueue(new SimpleString(queueName), new SimpleString(address));
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    // NotificationEmitter implementation ----------------------------
@@ -587,19 +1060,43 @@
                                           final NotificationFilter filter,
                                           final Object handback) throws ListenerNotFoundException
    {
-      broadcaster.removeNotificationListener(listener, filter, handback);
+      clearIO();
+      try
+      {
+         broadcaster.removeNotificationListener(listener, filter, handback);
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public void removeNotificationListener(final NotificationListener listener) throws ListenerNotFoundException
    {
-      broadcaster.removeNotificationListener(listener);
+      clearIO();
+      try
+      {
+         broadcaster.removeNotificationListener(listener);
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public void addNotificationListener(final NotificationListener listener,
                                        final NotificationFilter filter,
                                        final Object handback) throws IllegalArgumentException
    {
-      broadcaster.addNotificationListener(listener, filter, handback);
+      clearIO();
+      try
+      {
+         broadcaster.addNotificationListener(listener, filter, handback);
+      }
+      finally
+      {
+         blockOnIO();
+      }
    }
 
    public MBeanNotificationInfo[] getNotificationInfo()
@@ -621,7 +1118,7 @@
 
    // Private -------------------------------------------------------
 
-   private synchronized void setMessageCounterEnabled(boolean enable)
+   private synchronized void setMessageCounterEnabled(final boolean enable)
    {
       if (isStarted())
       {

Modified: trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java	2009-12-03 23:46:53 UTC (rev 8543)
+++ trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java	2009-12-03 23:54:22 UTC (rev 8544)
@@ -214,6 +214,7 @@
                                                             remotingService,
                                                             messagingServer,
                                                             messageCounterManager,
+                                                            storageManager,
                                                             broadcaster);
       ObjectName objectName = objectNameBuilder.getHornetQServerObjectName();
       registerInJMX(objectName, messagingServerControl);
@@ -232,7 +233,7 @@
    public synchronized void registerAddress(final SimpleString address) throws Exception
    {
       ObjectName objectName = objectNameBuilder.getAddressObjectName(address);
-      AddressControlImpl addressControl = new AddressControlImpl(address, postOffice, pagingManager, securityRepository);
+      AddressControlImpl addressControl = new AddressControlImpl(address, postOffice, pagingManager, storageManager, securityRepository);
 
       registerInJMX(objectName, addressControl);
 
@@ -293,7 +294,7 @@
    public synchronized void registerDivert(Divert divert, DivertConfiguration config) throws Exception
    {
       ObjectName objectName = objectNameBuilder.getDivertObjectName(divert.getUniqueName());
-      DivertControl divertControl = new DivertControlImpl(divert, config);
+      DivertControl divertControl = new DivertControlImpl(divert, storageManager, config);
       registerInJMX(objectName, new StandardMBean(divertControl, DivertControl.class));
       registerInRegistry(ResourceNames.CORE_DIVERT + config.getName(), divertControl);
 
@@ -313,7 +314,7 @@
    public synchronized void registerAcceptor(final Acceptor acceptor, final TransportConfiguration configuration) throws Exception
    {
       ObjectName objectName = objectNameBuilder.getAcceptorObjectName(configuration.getName());
-      AcceptorControl control = new AcceptorControlImpl(acceptor, configuration);
+      AcceptorControl control = new AcceptorControlImpl(acceptor, storageManager, configuration);
       registerInJMX(objectName, new StandardMBean(control, AcceptorControl.class));
       registerInRegistry(ResourceNames.CORE_ACCEPTOR + configuration.getName(), control);
    }
@@ -355,7 +356,7 @@
    {
       broadcastGroup.setNotificationService(this);
       ObjectName objectName = objectNameBuilder.getBroadcastGroupObjectName(configuration.getName());
-      BroadcastGroupControl control = new BroadcastGroupControlImpl(broadcastGroup, configuration);
+      BroadcastGroupControl control = new BroadcastGroupControlImpl(broadcastGroup, storageManager, configuration);
       registerInJMX(objectName, new StandardMBean(control, BroadcastGroupControl.class));
       registerInRegistry(ResourceNames.CORE_BROADCAST_GROUP + configuration.getName(), control);
    }
@@ -372,7 +373,7 @@
    {
       discoveryGroup.setNotificationService(this);
       ObjectName objectName = objectNameBuilder.getDiscoveryGroupObjectName(configuration.getName());
-      DiscoveryGroupControl control = new DiscoveryGroupControlImpl(discoveryGroup, configuration);
+      DiscoveryGroupControl control = new DiscoveryGroupControlImpl(discoveryGroup, storageManager, configuration);
       registerInJMX(objectName, new StandardMBean(control, DiscoveryGroupControl.class));
       registerInRegistry(ResourceNames.CORE_DISCOVERY_GROUP + configuration.getName(), control);
    }
@@ -388,7 +389,7 @@
    {
       bridge.setNotificationService(this);
       ObjectName objectName = objectNameBuilder.getBridgeObjectName(configuration.getName());
-      BridgeControl control = new BridgeControlImpl(bridge, configuration);
+      BridgeControl control = new BridgeControlImpl(bridge, storageManager, configuration);
       registerInJMX(objectName, new StandardMBean(control, BridgeControl.class));
       registerInRegistry(ResourceNames.CORE_BRIDGE + configuration.getName(), control);
    }
@@ -404,7 +405,7 @@
                                             final ClusterConnectionConfiguration configuration) throws Exception
    {
       ObjectName objectName = objectNameBuilder.getClusterConnectionObjectName(configuration.getName());
-      ClusterConnectionControl control = new ClusterConnectionControlImpl(cluster, configuration);
+      ClusterConnectionControl control = new ClusterConnectionControlImpl(cluster, storageManager, configuration);
       registerInJMX(objectName, new StandardMBean(control, ClusterConnectionControl.class));
       registerInRegistry(ResourceNames.CORE_CLUSTER_CONNECTION + configuration.getName(), control);
    }

Modified: trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java	2009-12-03 23:46:53 UTC (rev 8543)
+++ trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java	2009-12-03 23:54:22 UTC (rev 8544)
@@ -44,7 +44,7 @@
  * @version <tt>$Revision$</tt>
  * 
  */
-public class QueueControlImpl extends StandardMBean implements QueueControl
+public class QueueControlImpl extends AbstractControl implements QueueControl
 {
    // Constants -----------------------------------------------------
 
@@ -60,8 +60,6 @@
 
    private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
 
-   private final StorageManager storageManager;
-
    private MessageCounter counter;
 
    // Static --------------------------------------------------------
@@ -84,12 +82,11 @@
                            final StorageManager storageManager,
                            final HierarchicalRepository<AddressSettings> addressSettingsRepository) throws Exception
    {
-      super(QueueControl.class);
+      super(QueueControl.class, storageManager);
       this.queue = queue;
       this.address = address;
       this.postOffice = postOffice;
       this.addressSettingsRepository = addressSettingsRepository;
-      this.storageManager = storageManager;
    }
 
    // Public --------------------------------------------------------
@@ -711,23 +708,5 @@
 
    // Private -------------------------------------------------------
 
-   private void clearIO()
-   {
-      storageManager.clearContext();
-   }
-
-   private void blockOnIO()
-   {
-      try
-      {
-         storageManager.waitOnOperations();
-         storageManager.clearContext();
-      }
-      catch (Exception e)
-      {
-         throw new RuntimeException(e.getMessage(), e);
-      }
-   }
-
    // Inner classes -------------------------------------------------
 }

Modified: trunk/src/main/org/hornetq/core/persistence/OperationContext.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/OperationContext.java	2009-12-03 23:46:53 UTC (rev 8543)
+++ trunk/src/main/org/hornetq/core/persistence/OperationContext.java	2009-12-03 23:54:22 UTC (rev 8544)
@@ -38,5 +38,9 @@
 
    /** To be called when there are no more operations pending */
    void complete();
+   
+   void waitCompletion() throws Exception;
+   
+   boolean waitCompletion(long timeout) throws Exception;
 
 }

Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-12-03 23:46:53 UTC (rev 8543)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-12-03 23:54:22 UTC (rev 8544)
@@ -315,6 +315,11 @@
 
    public void waitOnOperations() throws Exception
    {
+      if (!started)
+      {
+         log.warn("Server is stopped");
+         throw new IllegalStateException("Server is stopped");
+      }
       waitOnOperations(0);
    }
 
@@ -323,16 +328,14 @@
     */
    public void waitOnOperations(final long timeout) throws Exception
    {
-      SimpleWaitIOCallback waitCallback = new SimpleWaitIOCallback();
-      afterCompleteOperations(waitCallback);
-      completeOperations();
-      if (timeout == 0)
+      if (!started)
       {
-         waitCallback.waitCompletion();
+         log.warn("Server is stopped");
+         throw new IllegalStateException("Server is stopped");
       }
-      else if (!waitCallback.waitCompletion(timeout))
+      if (!getContext().waitCompletion(timeout))
       {
-         throw new IllegalStateException("no response received from replication");
+         throw new HornetQException(HornetQException.IO_ERROR, "Timeout on waiting I/O completion");
       }
    }
 
@@ -1542,6 +1545,21 @@
       {
       }
 
+      /* (non-Javadoc)
+       * @see org.hornetq.core.persistence.OperationContext#waitCompletion()
+       */
+      public void waitCompletion()
+      {
+      }
+
+      /* (non-Javadoc)
+       * @see org.hornetq.core.persistence.OperationContext#waitCompletion(long)
+       */
+      public boolean waitCompletion(long timeout)
+      {
+         return true;
+      }
+
    }
 
    private static class XidEncoding implements EncodingSupport

Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java	2009-12-03 23:46:53 UTC (rev 8543)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java	2009-12-03 23:54:22 UTC (rev 8544)
@@ -21,6 +21,7 @@
 
 import org.hornetq.core.exception.HornetQException;
 import org.hornetq.core.journal.IOAsyncTask;
+import org.hornetq.core.journal.impl.SimpleWaitIOCallback;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.persistence.OperationContext;
 import org.hornetq.utils.ExecutorFactory;
@@ -210,6 +211,7 @@
       }
       catch (Throwable e)
       {
+         e.printStackTrace();
          log.warn("Error on executor's submit");
          executorsPending.decrementAndGet();
          task.onError(HornetQException.INTERNAL_ERROR, "It wasn't possible to complete IO operation - " + e.getMessage());
@@ -262,4 +264,27 @@
       }
    }
 
+   /* (non-Javadoc)
+    * @see org.hornetq.core.persistence.OperationContext#waitCompletion()
+    */
+   public void waitCompletion() throws Exception
+   {
+      waitCompletion(0);
+   }
+
+   /* (non-Javadoc)
+    * @see org.hornetq.core.persistence.OperationContext#waitCompletion(long)
+    */
+   public boolean waitCompletion(long timeout) throws Exception
+   {
+      SimpleWaitIOCallback waitCallback = new SimpleWaitIOCallback();
+      executeOnCompletion(waitCallback);
+      complete();
+      if (timeout == 0)
+      {
+         waitCallback.waitCompletion();
+      }
+      return (waitCallback.waitCompletion(timeout));
+   }
+
 }

Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2009-12-03 23:46:53 UTC (rev 8543)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java	2009-12-03 23:54:22 UTC (rev 8544)
@@ -306,7 +306,7 @@
       {
          initialisePart2();
       }
-
+      
       // We start the remoting service here - if the server is a backup remoting service needs to be started
       // so it can be initialised by the live node
       remotingService.start();

Modified: trunk/tests/src/org/hornetq/tests/integration/client/OrderTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/OrderTest.java	2009-12-03 23:46:53 UTC (rev 8543)
+++ trunk/tests/src/org/hornetq/tests/integration/client/OrderTest.java	2009-12-03 23:54:22 UTC (rev 8544)
@@ -85,7 +85,7 @@
 
          for (int i = 0; i < 100; i++)
          {
-            ClientMessage msg = session.createClientMessage(i % 2 == 0);            
+            ClientMessage msg = session.createClientMessage(i % 2 == 0);
             msg.putIntProperty("id", i);
             prod.send(msg);
          }
@@ -103,7 +103,7 @@
                server.stop();
                server.start();
             }
-            
+
             session = sf.createSession(true, true);
 
             session.start();
@@ -144,6 +144,86 @@
 
    }
 
+   public void testOrderOverSessionClosePersistent() throws Exception
+   {
+      doTestOverCancel(true);
+   }
+
+   public void testOrderOverSessionCloseNonPersistent() throws Exception
+   {
+      doTestOverCancel(false);
+   }
+
+   public void doTestOverCancel(final boolean persistent) throws Exception
+   {
+      server = createServer(persistent, true);
+      server.start();
+
+      ClientSessionFactory sf = createNettyFactory();
+
+      sf.setBlockOnNonPersistentSend(false);
+      sf.setBlockOnPersistentSend(false);
+      sf.setBlockOnAcknowledge(false);
+
+      ClientSession session = sf.createSession(true, true, 0);
+
+      int numberOfMessages = 500;
+
+      try
+      {
+         session.createQueue("queue", "queue", true);
+
+         ClientProducer prod = session.createProducer("queue");
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            ClientMessage msg = session.createClientMessage(i % 2 == 0);
+            msg.putIntProperty("id", i);
+            prod.send(msg);
+         }
+
+         session.close();
+         
+         for (int i = 0 ; i < numberOfMessages;)
+         {
+            session = sf.createSession();
+            
+            session.start();
+            
+            ClientConsumer consumer = session.createConsumer("queue");
+            
+            int max = i + 10;
+            
+            for (;i < max; i++)
+            {
+               ClientMessage msg = consumer.receive(1000);
+               
+               msg.acknowledge();
+               
+               assertEquals(i, msg.getIntProperty("id").intValue());
+            }
+            
+            // Receive a few more messages but don't consume them
+            for (int j = 0 ; j < 10 && i < numberOfMessages; j++)
+            {
+               ClientMessage msg = consumer.receiveImmediate();
+               if (msg == null)
+               {
+                  break;
+               }
+            }
+            session.close();
+            
+         }
+      }
+      finally
+      {
+         sf.close();
+         session.close();
+      }
+
+   }
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: trunk/tests/src/org/hornetq/tests/integration/management/ManagementServiceImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/ManagementServiceImplTest.java	2009-12-03 23:46:53 UTC (rev 8543)
+++ trunk/tests/src/org/hornetq/tests/integration/management/ManagementServiceImplTest.java	2009-12-03 23:54:22 UTC (rev 8544)
@@ -26,6 +26,7 @@
 import org.hornetq.core.management.QueueControl;
 import org.hornetq.core.management.ResourceNames;
 import org.hornetq.core.management.impl.ManagementServiceImpl;
+import org.hornetq.core.persistence.impl.nullpm.NullStorageManager;
 import org.hornetq.core.server.HornetQ;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.core.server.Queue;
@@ -151,6 +152,7 @@
       Configuration conf  = new ConfigurationImpl();
       conf.setJMXManagementEnabled(false);
       ManagementServiceImpl managementService = new ManagementServiceImpl(null, conf);
+      managementService.setStorageManager(new NullStorageManager());
       
       SimpleString address = randomSimpleString();
       managementService.registerAddress(address);

Added: trunk/tests/src/org/hornetq/tests/unit/core/persistence/impl/OperationContextUnitTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/persistence/impl/OperationContextUnitTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/unit/core/persistence/impl/OperationContextUnitTest.java	2009-12-03 23:54:22 UTC (rev 8544)
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.unit.core.persistence.impl;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
+import org.hornetq.tests.util.UnitTestCase;
+
+/**
+ * A OperationContextUnitTest
+ *
+ * @author <mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class OperationContextUnitTest extends UnitTestCase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testCaptureException() throws Exception
+   {
+      ExecutorService executor = Executors.newSingleThreadExecutor();
+      executor.shutdown();
+      
+      final CountDownLatch latch = new CountDownLatch(1);
+      
+      final OperationContextImpl impl = new OperationContextImpl(executor)
+      {
+         public void complete()
+         {
+            super.complete();
+            latch.countDown();
+         }
+
+      };
+      
+      impl.storeLineUp();
+      
+      final AtomicInteger numberOfFailures = new AtomicInteger(0);
+      
+      Thread t = new Thread()
+      {
+         public void run()
+         {
+            try
+            {
+               impl.waitCompletion(5000);
+            }
+            catch (Throwable e)
+            {
+               e.printStackTrace();
+               numberOfFailures.incrementAndGet();
+            }
+         }
+      };
+      
+      t.start();
+
+      // Need to wait complete to be called first or the test would be invalid.
+      // We use a latch instead of forcing a sleep here
+      assertTrue(latch.await(5, TimeUnit.SECONDS));
+      
+      impl.done();
+      
+      t.join();
+      
+      assertEquals(1, numberOfFailures.get());
+   }
+   
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}



More information about the hornetq-commits mailing list