JBoss hornetq SVN: r8546 - trunk/tests/src/org/hornetq/tests/integration/cluster/failover.
by do-not-reply@jboss.org
Author: ataylor
Date: 2009-12-04 04:43:37 -0500 (Fri, 04 Dec 2009)
New Revision: 8546
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
Log:
test fix
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java 2009-12-04 00:16:17 UTC (rev 8545)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/GroupingFailoverTestBase.java 2009-12-04 09:43:37 UTC (rev 8546)
@@ -84,35 +84,19 @@
final CountDownLatch latch = new CountDownLatch(1);
- class MyListener implements FailureListener
- {
- public void connectionFailed(HornetQException me)
- {
- latch.countDown();
- }
- }
+ class MyListener implements FailureListener
+ {
+ public void connectionFailed(HornetQException me)
+ {
+ latch.countDown();
+ }
+ }
- final CountDownLatch latch2 = new CountDownLatch(1);
+ Map<String, MessageFlowRecord> records = ((ClusterConnectionImpl)getServer(1).getClusterManager().getClusterConnection(new SimpleString("cluster1"))).getRecords();
+ RemotingConnection rc = records.get("0").getBridge().getForwardingConnection() ;
+ rc.addFailureListener(new MyListener());
+ fail(rc, latch);
- class MyListener2 implements FailureListener
- {
- public void connectionFailed(HornetQException me)
- {
- latch2.countDown();
- }
- }
-
- Map<String, MessageFlowRecord> records = ((ClusterConnectionImpl)getServer(1).getClusterManager().getClusterConnection(new SimpleString("cluster1"))).getRecords();
- RemotingConnection rc = records.get("0").getBridge().getForwardingConnection() ;
- rc.addFailureListener(new MyListener());
- fail(rc, latch);
-
- records = ((ClusterConnectionImpl)getServer(0).getClusterManager().getClusterConnection(new SimpleString("cluster0"))).getRecords();
- rc = records.get("0").getBridge().getForwardingConnection() ;
- rc.addFailureListener(new MyListener2());
- fail(rc, latch);
-
-
waitForServerRestart(2);
setupSessionFactory(2, isNetty());
@@ -121,6 +105,10 @@
waitForBindings(2, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, false);
+
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+
waitForBindings(1, "queues.testaddress", 1, 1, false);
sendWithProperty(2, "queues.testaddress", 10, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
@@ -168,10 +156,11 @@
setupSessionFactory(1, isNetty());
createQueue(0, "queues.testaddress", "queue0", null, true);
- createQueue(1, "queues.testaddress", "queue0", null, true);
-
waitForBindings(0, "queues.testaddress", 1, 0, true);
+
+ createQueue(1, "queues.testaddress", "queue0", null, true);
+
waitForBindings(1, "queues.testaddress", 1, 0, true);
addConsumer(0, 0, "queue0", null);
@@ -202,25 +191,11 @@
}
}
- final CountDownLatch latch2 = new CountDownLatch(1);
-
- class MyListener2 implements FailureListener
- {
- public void connectionFailed(HornetQException me)
- {
- latch2.countDown();
- }
- }
-
Map<String, MessageFlowRecord> records = ((ClusterConnectionImpl)getServer(1).getClusterManager().getClusterConnection(new SimpleString("cluster1"))).getRecords();
RemotingConnection rc = records.get("0").getBridge().getForwardingConnection() ;
rc.addFailureListener(new MyListener());
fail(rc, latch);
- records = ((ClusterConnectionImpl)getServer(0).getClusterManager().getClusterConnection(new SimpleString("cluster0"))).getRecords();
- rc = records.get("0").getBridge().getForwardingConnection() ;
- rc.addFailureListener(new MyListener2());
- fail(rc, latch);
waitForServerRestart(2);
@@ -230,6 +205,10 @@
waitForBindings(2, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, false);
+
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+
waitForBindings(1, "queues.testaddress", 1, 1, false);
sendWithProperty(2, "queues.testaddress", 10, false, MessageImpl.HDR_GROUP_ID, new SimpleString("id1"));
15 years, 1 month
JBoss hornetq SVN: r8545 - trunk/src/main/org/hornetq/core/server/cluster/impl.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-12-03 19:16:17 -0500 (Thu, 03 Dec 2009)
New Revision: 8545
Modified:
trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
Log:
temporary comment out reset so test passes
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-12-03 23:54:22 UTC (rev 8544)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-12-04 00:16:17 UTC (rev 8545)
@@ -495,7 +495,7 @@
public void reset() throws Exception
{
- clearBindings();
+ //clearBindings();
}
public void setBridge(final Bridge bridge)
15 years, 1 month
JBoss hornetq SVN: r8544 - in trunk: src/main/org/hornetq/core/management/impl and 6 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-12-03 18:54:22 -0500 (Thu, 03 Dec 2009)
New Revision: 8544
Added:
trunk/src/main/org/hornetq/core/management/impl/AbstractControl.java
trunk/tests/src/org/hornetq/tests/unit/core/persistence/impl/OperationContextUnitTest.java
Modified:
trunk/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java
trunk/src/main/org/hornetq/core/management/impl/AcceptorControlImpl.java
trunk/src/main/org/hornetq/core/management/impl/AddressControlImpl.java
trunk/src/main/org/hornetq/core/management/impl/BridgeControlImpl.java
trunk/src/main/org/hornetq/core/management/impl/BroadcastGroupControlImpl.java
trunk/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java
trunk/src/main/org/hornetq/core/management/impl/DiscoveryGroupControlImpl.java
trunk/src/main/org/hornetq/core/management/impl/DivertControlImpl.java
trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java
trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
trunk/src/main/org/hornetq/core/persistence/OperationContext.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/OrderTest.java
trunk/tests/src/org/hornetq/tests/integration/management/ManagementServiceImplTest.java
Log:
Mainly making management controllers to block on the storage manager
Modified: trunk/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java 2009-12-03 23:46:53 UTC (rev 8543)
+++ trunk/src/main/org/hornetq/core/journal/impl/SimpleWaitIOCallback.java 2009-12-03 23:54:22 UTC (rev 8544)
@@ -60,12 +60,20 @@
{
throw new HornetQException(errorCode, errorMessage);
}
+
return;
}
public boolean waitCompletion(final long timeout) throws Exception
{
- return latch.await(timeout, TimeUnit.MILLISECONDS);
+ boolean retValue = latch.await(timeout, TimeUnit.MILLISECONDS);
+
+ if (errorMessage != null)
+ {
+ throw new HornetQException(errorCode, errorMessage);
+ }
+
+ return retValue;
}
/* (non-Javadoc)
Added: trunk/src/main/org/hornetq/core/management/impl/AbstractControl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/AbstractControl.java (rev 0)
+++ trunk/src/main/org/hornetq/core/management/impl/AbstractControl.java 2009-12-03 23:54:22 UTC (rev 8544)
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.management.impl;
+
+import javax.management.NotCompliantMBeanException;
+import javax.management.StandardMBean;
+
+import org.hornetq.core.persistence.StorageManager;
+
+/**
+ * A AbstractControl
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public abstract class AbstractControl extends StandardMBean
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ protected final StorageManager storageManager;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public AbstractControl(Class<?> clazz, StorageManager storageManager) throws NotCompliantMBeanException
+ {
+ super(clazz);
+ this.storageManager = storageManager;
+ }
+
+ // Public --------------------------------------------------------
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ protected void clearIO()
+ {
+ // the storage manager could be null on the backup on certain components
+ if (storageManager != null)
+ {
+ storageManager.clearContext();
+ }
+ }
+
+ protected void blockOnIO()
+ {
+ // the storage manager could be null on the backup on certain components
+ if (storageManager != null)
+ {
+ try
+ {
+ storageManager.waitOnOperations();
+ storageManager.clearContext();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/src/main/org/hornetq/core/management/impl/AcceptorControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/AcceptorControlImpl.java 2009-12-03 23:46:53 UTC (rev 8543)
+++ trunk/src/main/org/hornetq/core/management/impl/AcceptorControlImpl.java 2009-12-03 23:54:22 UTC (rev 8544)
@@ -19,6 +19,7 @@
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.management.AcceptorControl;
+import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.remoting.spi.Acceptor;
/**
@@ -28,7 +29,7 @@
*
* Created 11 dec. 2008 17:09:04
*/
-public class AcceptorControlImpl extends StandardMBean implements AcceptorControl
+public class AcceptorControlImpl extends AbstractControl implements AcceptorControl
{
// Constants -----------------------------------------------------
@@ -43,10 +44,11 @@
// Constructors --------------------------------------------------
- public AcceptorControlImpl(final Acceptor acceptor, final TransportConfiguration configuration)
- throws Exception
+ public AcceptorControlImpl(final Acceptor acceptor,
+ final StorageManager storageManager,
+ final TransportConfiguration configuration) throws Exception
{
- super(AcceptorControl.class);
+ super(AcceptorControl.class, storageManager);
this.acceptor = acceptor;
this.configuration = configuration;
}
@@ -55,32 +57,80 @@
public String getFactoryClassName()
{
- return configuration.getFactoryClassName();
+ clearIO();
+ try
+ {
+ return configuration.getFactoryClassName();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public String getName()
{
- return configuration.getName();
+ clearIO();
+ try
+ {
+ return configuration.getName();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public Map<String, Object> getParameters()
{
- return configuration.getParams();
+ clearIO();
+ try
+ {
+ return configuration.getParams();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public boolean isStarted()
{
- return acceptor.isStarted();
+ clearIO();
+ try
+ {
+ return acceptor.isStarted();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public void start() throws Exception
{
- acceptor.start();
+ clearIO();
+ try
+ {
+ acceptor.start();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
-
+
public void stop() throws Exception
{
- acceptor.stop();
+ clearIO();
+ try
+ {
+ acceptor.stop();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
// Public --------------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/management/impl/AddressControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/AddressControlImpl.java 2009-12-03 23:46:53 UTC (rev 8543)
+++ trunk/src/main/org/hornetq/core/management/impl/AddressControlImpl.java 2009-12-03 23:54:22 UTC (rev 8544)
@@ -21,6 +21,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.management.AddressControl;
import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
import org.hornetq.core.postoffice.PostOffice;
@@ -37,7 +38,7 @@
* @version <tt>$Revision$</tt>
*
*/
-public class AddressControlImpl extends StandardMBean implements AddressControl
+public class AddressControlImpl extends AbstractControl implements AddressControl
{
// Constants -----------------------------------------------------
@@ -49,7 +50,7 @@
private final SimpleString address;
private final PostOffice postOffice;
-
+
private final PagingManager pagingManager;
private final HierarchicalRepository<Set<Role>> securityRepository;
@@ -60,11 +61,11 @@
public AddressControlImpl(final SimpleString address,
final PostOffice postOffice,
- final PagingManager pagingManager,
- final HierarchicalRepository<Set<Role>> securityRepository)
- throws Exception
+ final PagingManager pagingManager,
+ final StorageManager storageManager,
+ final HierarchicalRepository<Set<Role>> securityRepository) throws Exception
{
- super(AddressControl.class);
+ super(AddressControl.class, storageManager);
this.address = address;
this.postOffice = postOffice;
this.pagingManager = pagingManager;
@@ -82,6 +83,7 @@
public String[] getQueueNames() throws Exception
{
+ clearIO();
try
{
Bindings bindings = postOffice.getBindingsForAddress(address);
@@ -97,49 +99,85 @@
{
throw new IllegalStateException(t.getMessage());
}
+ finally
+ {
+ blockOnIO();
+ }
}
public Object[] getRoles() throws Exception
{
- Set<Role> roles = securityRepository.getMatch(address.toString());
+ clearIO();
+ try
+ {
+ Set<Role> roles = securityRepository.getMatch(address.toString());
- Object[] objRoles = new Object[roles.size()];
+ Object[] objRoles = new Object[roles.size()];
- int i = 0;
- for (Role role : roles)
+ int i = 0;
+ for (Role role : roles)
+ {
+ objRoles[i++] = new Object[] { role.getName(),
+ CheckType.SEND.hasRole(role),
+ CheckType.CONSUME.hasRole(role),
+ CheckType.CREATE_DURABLE_QUEUE.hasRole(role),
+ CheckType.DELETE_DURABLE_QUEUE.hasRole(role),
+ CheckType.CREATE_NON_DURABLE_QUEUE.hasRole(role),
+ CheckType.DELETE_NON_DURABLE_QUEUE.hasRole(role),
+ CheckType.MANAGE.hasRole(role) };
+ }
+ return objRoles;
+ }
+ finally
{
- objRoles[i++] = new Object[] { role.getName(),
- CheckType.SEND.hasRole(role),
- CheckType.CONSUME.hasRole(role),
- CheckType.CREATE_DURABLE_QUEUE.hasRole(role),
- CheckType.DELETE_DURABLE_QUEUE.hasRole(role),
- CheckType.CREATE_NON_DURABLE_QUEUE.hasRole(role),
- CheckType.DELETE_NON_DURABLE_QUEUE.hasRole(role),
- CheckType.MANAGE.hasRole(role) };
+ blockOnIO();
}
- return objRoles;
}
public String getRolesAsJSON() throws Exception
{
- JSONArray json = new JSONArray();
- Set<Role> roles = securityRepository.getMatch(address.toString());
+ clearIO();
+ try
+ {
+ JSONArray json = new JSONArray();
+ Set<Role> roles = securityRepository.getMatch(address.toString());
- for (Role role : roles)
+ for (Role role : roles)
+ {
+ json.put(new JSONObject(role));
+ }
+ return json.toString();
+ }
+ finally
{
- json.put(new JSONObject(role));
+ blockOnIO();
}
- return json.toString();
}
-
+
public long getNumberOfBytesPerPage() throws Exception
{
- return pagingManager.getPageStore(address).getPageSizeBytes();
+ clearIO();
+ try
+ {
+ return pagingManager.getPageStore(address).getPageSizeBytes();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public int getNumberOfPages() throws Exception
{
- return pagingManager.getPageStore(address).getNumberOfPages();
+ clearIO();
+ try
+ {
+ return pagingManager.getPageStore(address).getNumberOfPages();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public synchronized void addRole(final String name,
@@ -151,43 +189,59 @@
final boolean deleteNonDurableQueue,
final boolean manage) throws Exception
{
- Set<Role> roles = securityRepository.getMatch(address.toString());
- Role newRole = new Role(name,
- send,
- consume,
- createDurableQueue,
- deleteDurableQueue,
- createNonDurableQueue,
- deleteNonDurableQueue,
- manage);
- boolean added = roles.add(newRole);
- if (!added)
+ clearIO();
+ try
{
- throw new IllegalArgumentException("Role " + name + " already exists");
+ Set<Role> roles = securityRepository.getMatch(address.toString());
+ Role newRole = new Role(name,
+ send,
+ consume,
+ createDurableQueue,
+ deleteDurableQueue,
+ createNonDurableQueue,
+ deleteNonDurableQueue,
+ manage);
+ boolean added = roles.add(newRole);
+ if (!added)
+ {
+ throw new IllegalArgumentException("Role " + name + " already exists");
+ }
+ securityRepository.addMatch(address.toString(), roles);
}
- securityRepository.addMatch(address.toString(), roles);
+ finally
+ {
+ blockOnIO();
+ }
}
public synchronized void removeRole(final String role) throws Exception
{
- Set<Role> roles = securityRepository.getMatch(address.toString());
- Iterator<Role> it = roles.iterator();
- boolean removed = false;
- while (it.hasNext())
+ clearIO();
+ try
{
- Role r = it.next();
- if (r.getName().equals(role))
+ Set<Role> roles = securityRepository.getMatch(address.toString());
+ Iterator<Role> it = roles.iterator();
+ boolean removed = false;
+ while (it.hasNext())
{
- it.remove();
- removed = true;
- break;
+ Role r = it.next();
+ if (r.getName().equals(role))
+ {
+ it.remove();
+ removed = true;
+ break;
+ }
}
+ if (!removed)
+ {
+ throw new IllegalArgumentException("Role " + role + " does not exist");
+ }
+ securityRepository.addMatch(address.toString(), roles);
}
- if (!removed)
+ finally
{
- throw new IllegalArgumentException("Role " + role + " does not exist");
+ blockOnIO();
}
- securityRepository.addMatch(address.toString(), roles);
}
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/hornetq/core/management/impl/BridgeControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/BridgeControlImpl.java 2009-12-03 23:46:53 UTC (rev 8543)
+++ trunk/src/main/org/hornetq/core/management/impl/BridgeControlImpl.java 2009-12-03 23:54:22 UTC (rev 8544)
@@ -13,10 +13,9 @@
package org.hornetq.core.management.impl;
-import javax.management.StandardMBean;
-
import org.hornetq.core.config.cluster.BridgeConfiguration;
import org.hornetq.core.management.BridgeControl;
+import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.server.cluster.Bridge;
/**
@@ -26,7 +25,7 @@
*
* Created 11 dec. 2008 17:09:04
*/
-public class BridgeControlImpl extends StandardMBean implements BridgeControl
+public class BridgeControlImpl extends AbstractControl implements BridgeControl
{
// Constants -----------------------------------------------------
@@ -41,9 +40,11 @@
// Constructors --------------------------------------------------
- public BridgeControlImpl(final Bridge bridge, final BridgeConfiguration configuration) throws Exception
+ public BridgeControlImpl(final Bridge bridge,
+ final StorageManager storageManager,
+ final BridgeConfiguration configuration) throws Exception
{
- super(BridgeControl.class);
+ super(BridgeControl.class, storageManager);
this.bridge = bridge;
this.configuration = configuration;
}
@@ -52,82 +53,202 @@
public String[] getConnectorPair() throws Exception
{
- String[] pair = new String[2];
+ clearIO();
+ try
+ {
+ String[] pair = new String[2];
- pair[0] = configuration.getConnectorPair().a;
- pair[1] = configuration.getConnectorPair().b != null ? configuration.getConnectorPair().b : null;
+ pair[0] = configuration.getConnectorPair().a;
+ pair[1] = configuration.getConnectorPair().b != null ? configuration.getConnectorPair().b : null;
- return pair;
+ return pair;
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public String getForwardingAddress()
{
- return configuration.getForwardingAddress();
+ clearIO();
+ try
+ {
+ return configuration.getForwardingAddress();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public String getQueueName()
{
- return configuration.getQueueName();
+ clearIO();
+ try
+ {
+ return configuration.getQueueName();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public String getDiscoveryGroupName()
{
- return configuration.getDiscoveryGroupName();
+ clearIO();
+ try
+ {
+ return configuration.getDiscoveryGroupName();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public String getFilterString()
{
- return configuration.getFilterString();
+ clearIO();
+ try
+ {
+ return configuration.getFilterString();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public int getReconnectAttempts()
{
- return configuration.getReconnectAttempts();
+ clearIO();
+ try
+ {
+ return configuration.getReconnectAttempts();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public boolean isFailoverOnServerShutdown()
{
- return configuration.isFailoverOnServerShutdown();
+ clearIO();
+ try
+ {
+ return configuration.isFailoverOnServerShutdown();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public String getName()
{
- return configuration.getName();
+ clearIO();
+ try
+ {
+ return configuration.getName();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public long getRetryInterval()
{
- return configuration.getRetryInterval();
+ clearIO();
+ try
+ {
+ return configuration.getRetryInterval();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public double getRetryIntervalMultiplier()
{
- return configuration.getRetryIntervalMultiplier();
+ clearIO();
+ try
+ {
+ return configuration.getRetryIntervalMultiplier();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public String getTransformerClassName()
{
- return configuration.getTransformerClassName();
+ clearIO();
+ try
+ {
+ return configuration.getTransformerClassName();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public boolean isStarted()
{
- return bridge.isStarted();
+ clearIO();
+ try
+ {
+ return bridge.isStarted();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public boolean isUseDuplicateDetection()
{
- return configuration.isUseDuplicateDetection();
+ clearIO();
+ try
+ {
+ return configuration.isUseDuplicateDetection();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public void start() throws Exception
{
- bridge.start();
+ clearIO();
+ try
+ {
+ bridge.start();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public void stop() throws Exception
{
- bridge.stop();
+ clearIO();
+ try
+ {
+ bridge.stop();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
// Public --------------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/management/impl/BroadcastGroupControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/BroadcastGroupControlImpl.java 2009-12-03 23:46:53 UTC (rev 8543)
+++ trunk/src/main/org/hornetq/core/management/impl/BroadcastGroupControlImpl.java 2009-12-03 23:54:22 UTC (rev 8544)
@@ -17,6 +17,7 @@
import org.hornetq.core.config.cluster.BroadcastGroupConfiguration;
import org.hornetq.core.management.BroadcastGroupControl;
+import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.server.cluster.BroadcastGroup;
import org.hornetq.utils.Pair;
import org.hornetq.utils.json.JSONArray;
@@ -29,7 +30,7 @@
*
* Created 11 dec. 2008 17:09:04
*/
-public class BroadcastGroupControlImpl extends StandardMBean implements BroadcastGroupControl
+public class BroadcastGroupControlImpl extends AbstractControl implements BroadcastGroupControl
{
// Constants -----------------------------------------------------
@@ -44,88 +45,169 @@
// Constructors --------------------------------------------------
- public BroadcastGroupControlImpl(final BroadcastGroup broadcastGroup, final BroadcastGroupConfiguration configuration)
- throws Exception
+ public BroadcastGroupControlImpl(final BroadcastGroup broadcastGroup,
+ final StorageManager storageManager,
+ final BroadcastGroupConfiguration configuration) throws Exception
{
- super(BroadcastGroupControl.class);
+ super(BroadcastGroupControl.class, storageManager);
this.broadcastGroup = broadcastGroup;
this.configuration = configuration;
}
// BroadcastGroupControlMBean implementation ---------------------
-
+
public String getName()
{
- return configuration.getName();
+ clearIO();
+ try
+ {
+ return configuration.getName();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public long getBroadcastPeriod()
{
- return configuration.getBroadcastPeriod();
+ clearIO();
+ try
+ {
+ return configuration.getBroadcastPeriod();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public Object[] getConnectorPairs()
{
- Object[] ret = new Object[configuration.getConnectorInfos().size()];
-
- int i = 0;
- for (Pair<String, String> pair: configuration.getConnectorInfos())
+ clearIO();
+ try
{
- String[] opair = new String[2];
-
- opair[0] = pair.a;
- opair[1] = pair.b != null ? pair.b : null;
-
- ret[i++] = opair;
+ Object[] ret = new Object[configuration.getConnectorInfos().size()];
+
+ int i = 0;
+ for (Pair<String, String> pair : configuration.getConnectorInfos())
+ {
+ String[] opair = new String[2];
+
+ opair[0] = pair.a;
+ opair[1] = pair.b != null ? pair.b : null;
+
+ ret[i++] = opair;
+ }
+
+ return ret;
}
-
- return ret;
+ finally
+ {
+ blockOnIO();
+ }
}
-
+
public String getConnectorPairsAsJSON() throws Exception
{
- JSONArray array = new JSONArray();
-
- for (Pair<String, String> pair: configuration.getConnectorInfos())
+ clearIO();
+ try
{
- JSONObject p = new JSONObject();
- p.put("a", pair.a);
- p.put("b", pair.b);
- array.put(p);
+ JSONArray array = new JSONArray();
+
+ for (Pair<String, String> pair : configuration.getConnectorInfos())
+ {
+ JSONObject p = new JSONObject();
+ p.put("a", pair.a);
+ p.put("b", pair.b);
+ array.put(p);
+ }
+ return array.toString();
}
- return array.toString();
+ finally
+ {
+ blockOnIO();
+ }
}
public String getGroupAddress()
{
- return configuration.getGroupAddress();
+ clearIO();
+ try
+ {
+ return configuration.getGroupAddress();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public int getGroupPort()
{
- return configuration.getGroupPort();
+ clearIO();
+ try
+ {
+ return configuration.getGroupPort();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public int getLocalBindPort()
{
- return configuration.getLocalBindPort();
+ clearIO();
+ try
+ {
+ return configuration.getLocalBindPort();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
// MessagingComponentControlMBean implementation -----------------
public boolean isStarted()
{
- return broadcastGroup.isStarted();
+ clearIO();
+ try
+ {
+ return broadcastGroup.isStarted();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public void start() throws Exception
{
- broadcastGroup.start();
+ clearIO();
+ try
+ {
+ broadcastGroup.start();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public void stop() throws Exception
{
- broadcastGroup.stop();
+ clearIO();
+ try
+ {
+ broadcastGroup.stop();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
// Public --------------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java 2009-12-03 23:46:53 UTC (rev 8543)
+++ trunk/src/main/org/hornetq/core/management/impl/ClusterConnectionControlImpl.java 2009-12-03 23:54:22 UTC (rev 8544)
@@ -20,6 +20,7 @@
import org.hornetq.core.config.cluster.ClusterConnectionConfiguration;
import org.hornetq.core.management.ClusterConnectionControl;
+import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.utils.Pair;
import org.hornetq.utils.json.JSONArray;
@@ -30,7 +31,7 @@
*
* @author <a href="jmesnil(a)redhat.com">Jeff Mesnil</a>
*/
-public class ClusterConnectionControlImpl extends StandardMBean implements ClusterConnectionControl
+public class ClusterConnectionControlImpl extends AbstractControl implements ClusterConnectionControl
{
// Constants -----------------------------------------------------
@@ -46,9 +47,10 @@
// Constructors --------------------------------------------------
public ClusterConnectionControlImpl(final ClusterConnection clusterConnection,
+ final StorageManager storageManager,
ClusterConnectionConfiguration configuration) throws Exception
{
- super(ClusterConnectionControl.class);
+ super(ClusterConnectionControl.class, storageManager);
this.clusterConnection = clusterConnection;
this.configuration = configuration;
}
@@ -57,108 +59,225 @@
public String getAddress()
{
- return configuration.getAddress();
+ clearIO();
+ try
+ {
+ return configuration.getAddress();
+ }
+ finally
+ {
+ blockOnIO();
+ }
+
}
public String getDiscoveryGroupName()
{
- return configuration.getDiscoveryGroupName();
+ clearIO();
+ try
+ {
+ return configuration.getDiscoveryGroupName();
+ }
+ finally
+ {
+ blockOnIO();
+ }
+
}
public int getMaxHops()
{
- return configuration.getMaxHops();
+ clearIO();
+ try
+ {
+ return configuration.getMaxHops();
+ }
+ finally
+ {
+ blockOnIO();
+ }
+
}
public String getName()
{
- return configuration.getName();
+ clearIO();
+ try
+ {
+ return configuration.getName();
+ }
+ finally
+ {
+ blockOnIO();
+ }
+
}
public long getRetryInterval()
{
- return configuration.getRetryInterval();
+ clearIO();
+ try
+ {
+ return configuration.getRetryInterval();
+ }
+ finally
+ {
+ blockOnIO();
+ }
+
}
-
+
public String getNodeID()
{
- return clusterConnection.getNodeID();
+ clearIO();
+ try
+ {
+ return clusterConnection.getNodeID();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public Object[] getStaticConnectorNamePairs()
{
- List<Pair<String, String>> pairs = configuration.getStaticConnectorNamePairs();
-
- if (pairs == null)
+ clearIO();
+ try
{
- return null;
- }
-
- Object[] ret = new Object[pairs.size()];
+ List<Pair<String, String>> pairs = configuration.getStaticConnectorNamePairs();
- int i = 0;
- for (Pair<String, String> pair : configuration.getStaticConnectorNamePairs())
- {
- String[] opair = new String[2];
+ if (pairs == null)
+ {
+ return null;
+ }
- opair[0] = pair.a;
- opair[1] = pair.b != null ? pair.b : null;
+ Object[] ret = new Object[pairs.size()];
- ret[i++] = opair;
+ int i = 0;
+ for (Pair<String, String> pair : configuration.getStaticConnectorNamePairs())
+ {
+ String[] opair = new String[2];
+
+ opair[0] = pair.a;
+ opair[1] = pair.b != null ? pair.b : null;
+
+ ret[i++] = opair;
+ }
+
+ return ret;
}
-
- return ret;
+ finally
+ {
+ blockOnIO();
+ }
}
public String getStaticConnectorNamePairsAsJSON() throws Exception
{
- List<Pair<String, String>> pairs = configuration.getStaticConnectorNamePairs();
-
- if (pairs == null)
+ clearIO();
+ try
{
- return null;
+ List<Pair<String, String>> pairs = configuration.getStaticConnectorNamePairs();
+
+ if (pairs == null)
+ {
+ return null;
+ }
+
+ JSONArray array = new JSONArray();
+
+ for (Pair<String, String> pair : pairs)
+ {
+ JSONObject p = new JSONObject();
+ p.put("a", pair.a);
+ p.put("b", pair.b);
+ array.put(p);
+ }
+ return array.toString();
}
-
- JSONArray array = new JSONArray();
-
- for (Pair<String, String> pair : pairs)
+ finally
{
- JSONObject p = new JSONObject();
- p.put("a", pair.a);
- p.put("b", pair.b);
- array.put(p);
+ blockOnIO();
}
- return array.toString();
}
public boolean isDuplicateDetection()
{
- return configuration.isDuplicateDetection();
+ clearIO();
+ try
+ {
+ return configuration.isDuplicateDetection();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public boolean isForwardWhenNoConsumers()
{
- return configuration.isForwardWhenNoConsumers();
+ clearIO();
+ try
+ {
+ return configuration.isForwardWhenNoConsumers();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public Map<String, String> getNodes() throws Exception
{
- return clusterConnection.getNodes();
+ clearIO();
+ try
+ {
+ return clusterConnection.getNodes();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
-
+
public boolean isStarted()
{
- return clusterConnection.isStarted();
+ clearIO();
+ try
+ {
+ return clusterConnection.isStarted();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public void start() throws Exception
{
- clusterConnection.start();
+ clearIO();
+ try
+ {
+ clusterConnection.start();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public void stop() throws Exception
{
- clusterConnection.stop();
+ clearIO();
+ try
+ {
+ clusterConnection.stop();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
// Public --------------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/management/impl/DiscoveryGroupControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/DiscoveryGroupControlImpl.java 2009-12-03 23:46:53 UTC (rev 8543)
+++ trunk/src/main/org/hornetq/core/management/impl/DiscoveryGroupControlImpl.java 2009-12-03 23:54:22 UTC (rev 8544)
@@ -18,6 +18,7 @@
import org.hornetq.core.cluster.DiscoveryGroup;
import org.hornetq.core.config.cluster.DiscoveryGroupConfiguration;
import org.hornetq.core.management.DiscoveryGroupControl;
+import org.hornetq.core.persistence.StorageManager;
/**
* A AcceptorControl
@@ -26,7 +27,7 @@
*
* Created 11 dec. 2008 17:09:04
*/
-public class DiscoveryGroupControlImpl extends StandardMBean implements DiscoveryGroupControl
+public class DiscoveryGroupControlImpl extends AbstractControl implements DiscoveryGroupControl
{
// Constants -----------------------------------------------------
@@ -41,10 +42,11 @@
// Constructors --------------------------------------------------
- public DiscoveryGroupControlImpl(final DiscoveryGroup acceptor, final DiscoveryGroupConfiguration configuration)
- throws Exception
+ public DiscoveryGroupControlImpl(final DiscoveryGroup acceptor,
+ final StorageManager storageManager,
+ final DiscoveryGroupConfiguration configuration) throws Exception
{
- super(DiscoveryGroupControl.class);
+ super(DiscoveryGroupControl.class, storageManager);
this.discoveryGroup = acceptor;
this.configuration = configuration;
}
@@ -53,37 +55,99 @@
public String getName()
{
- return configuration.getName();
+ clearIO();
+ try
+ {
+ return configuration.getName();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public String getGroupAddress()
{
- return configuration.getGroupAddress();
+ clearIO();
+ try
+ {
+ return configuration.getGroupAddress();
+ }
+ finally
+ {
+ blockOnIO();
+ }
+
}
public int getGroupPort()
{
- return configuration.getGroupPort();
+ clearIO();
+ try
+ {
+ return configuration.getGroupPort();
+ }
+ finally
+ {
+ blockOnIO();
+ }
+
}
public long getRefreshTimeout()
{
- return configuration.getRefreshTimeout();
+ clearIO();
+ try
+ {
+ return configuration.getRefreshTimeout();
+ }
+ finally
+ {
+ blockOnIO();
+ }
+
}
public boolean isStarted()
{
- return discoveryGroup.isStarted();
+ clearIO();
+ try
+ {
+ return discoveryGroup.isStarted();
+ }
+ finally
+ {
+ blockOnIO();
+ }
+
}
public void start() throws Exception
{
- discoveryGroup.start();
+ clearIO();
+ try
+ {
+ discoveryGroup.start();
+ }
+ finally
+ {
+ blockOnIO();
+ }
+
}
public void stop() throws Exception
{
- discoveryGroup.stop();
+ clearIO();
+ try
+ {
+ discoveryGroup.stop();
+ }
+ finally
+ {
+ blockOnIO();
+ }
+
}
// Public --------------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/management/impl/DivertControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/DivertControlImpl.java 2009-12-03 23:46:53 UTC (rev 8543)
+++ trunk/src/main/org/hornetq/core/management/impl/DivertControlImpl.java 2009-12-03 23:54:22 UTC (rev 8544)
@@ -17,6 +17,7 @@
import org.hornetq.core.config.cluster.DivertConfiguration;
import org.hornetq.core.management.DivertControl;
+import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.server.Divert;
/**
@@ -26,7 +27,7 @@
*
* Created 11 dec. 2008 17:09:04
*/
-public class DivertControlImpl extends StandardMBean implements DivertControl
+public class DivertControlImpl extends AbstractControl implements DivertControl
{
// Constants -----------------------------------------------------
@@ -43,47 +44,104 @@
// DivertControlMBean implementation ---------------------------
- public DivertControlImpl(final Divert divert, final DivertConfiguration configuration)
- throws Exception
+ public DivertControlImpl(final Divert divert,
+ final StorageManager storageManager,
+ final DivertConfiguration configuration) throws Exception
{
- super(DivertControl.class);
+ super(DivertControl.class, storageManager);
this.divert = divert;
this.configuration = configuration;
}
public String getAddress()
{
- return configuration.getAddress();
+ clearIO();
+ try
+ {
+ return configuration.getAddress();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public String getFilter()
{
- return configuration.getFilterString();
+ clearIO();
+ try
+ {
+ return configuration.getFilterString();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public String getForwardingAddress()
{
- return configuration.getForwardingAddress();
+ clearIO();
+ try
+ {
+ return configuration.getForwardingAddress();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public String getRoutingName()
{
- return divert.getRoutingName().toString();
+ clearIO();
+ try
+ {
+ return divert.getRoutingName().toString();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public String getTransformerClassName()
{
- return configuration.getTransformerClassName();
+ clearIO();
+ try
+ {
+ return configuration.getTransformerClassName();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public String getUniqueName()
{
- return divert.getUniqueName().toString();
+ clearIO();
+ try
+ {
+ return divert.getUniqueName().toString();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public boolean isExclusive()
{
- return divert.isExclusive();
+ clearIO();
+ try
+ {
+ return divert.isExclusive();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
// Public --------------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2009-12-03 23:46:53 UTC (rev 8543)
+++ trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2009-12-03 23:54:22 UTC (rev 8544)
@@ -30,7 +30,6 @@
import javax.management.NotificationEmitter;
import javax.management.NotificationFilter;
import javax.management.NotificationListener;
-import javax.management.StandardMBean;
import javax.transaction.xa.Xid;
import org.hornetq.core.config.Configuration;
@@ -43,6 +42,7 @@
import org.hornetq.core.management.QueueControl;
import org.hornetq.core.messagecounter.MessageCounterManager;
import org.hornetq.core.messagecounter.impl.MessageCounterManagerImpl;
+import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.server.RemotingService;
@@ -62,7 +62,7 @@
* @version <tt>$Revision$</tt>
*
*/
-public class HornetQServerControlImpl extends StandardMBean implements HornetQServerControl, NotificationEmitter
+public class HornetQServerControlImpl extends AbstractControl implements HornetQServerControl, NotificationEmitter
{
// Constants -----------------------------------------------------
@@ -94,14 +94,15 @@
final RemotingService remotingService,
final HornetQServer messagingServer,
final MessageCounterManager messageCounterManager,
+ final StorageManager storageManager,
final NotificationBroadcasterSupport broadcaster) throws Exception
{
- super(HornetQServerControl.class);
+ super(HornetQServerControl.class, storageManager);
this.postOffice = postOffice;
this.configuration = configuration;
this.resourceManager = resourceManager;
this.remotingService = remotingService;
- this.server = messagingServer;
+ server = messagingServer;
this.messageCounterManager = messageCounterManager;
this.broadcaster = broadcaster;
}
@@ -117,468 +118,940 @@
public boolean isStarted()
{
- return server.isStarted();
+ clearIO();
+ try
+ {
+ return server.isStarted();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public String getVersion()
{
- return server.getVersion().getFullVersion();
+ clearIO();
+ try
+ {
+ return server.getVersion().getFullVersion();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public boolean isBackup()
{
- return configuration.isBackup();
+ clearIO();
+ try
+ {
+ return configuration.isBackup();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public boolean isSharedStore()
{
- return configuration.isSharedStore();
+ clearIO();
+ try
+ {
+ return configuration.isSharedStore();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public String getBackupConnectorName()
{
- return configuration.getBackupConnectorName();
+ clearIO();
+ try
+ {
+ return configuration.getBackupConnectorName();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public String getBindingsDirectory()
{
- return configuration.getBindingsDirectory();
+ clearIO();
+ try
+ {
+ return configuration.getBindingsDirectory();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public String[] getInterceptorClassNames()
{
- return configuration.getInterceptorClassNames().toArray(new String[configuration.getInterceptorClassNames()
- .size()]);
+ clearIO();
+ try
+ {
+ return configuration.getInterceptorClassNames().toArray(new String[configuration.getInterceptorClassNames()
+ .size()]);
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public int getJournalBufferSize()
{
- return configuration.getJournalType() == JournalType.ASYNCIO ? configuration.getJournalBufferSize_AIO()
- : configuration.getJournalBufferSize_NIO();
+ clearIO();
+ try
+ {
+ return configuration.getJournalType() == JournalType.ASYNCIO ? configuration.getJournalBufferSize_AIO()
+ : configuration.getJournalBufferSize_NIO();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public int getJournalBufferTimeout()
{
- return configuration.getJournalType() == JournalType.ASYNCIO ? configuration.getJournalBufferTimeout_AIO()
- : configuration.getJournalBufferTimeout_NIO();
+ clearIO();
+ try
+ {
+ return configuration.getJournalType() == JournalType.ASYNCIO ? configuration.getJournalBufferTimeout_AIO()
+ : configuration.getJournalBufferTimeout_NIO();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public int getJournalMaxIO()
{
- return configuration.getJournalType() == JournalType.ASYNCIO ? configuration.getJournalMaxIO_AIO()
- : configuration.getJournalMaxIO_NIO();
+ clearIO();
+ try
+ {
+ return configuration.getJournalType() == JournalType.ASYNCIO ? configuration.getJournalMaxIO_AIO()
+ : configuration.getJournalMaxIO_NIO();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public String getJournalDirectory()
{
- return configuration.getJournalDirectory();
+ clearIO();
+ try
+ {
+ return configuration.getJournalDirectory();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public int getJournalFileSize()
{
- return configuration.getJournalFileSize();
+ clearIO();
+ try
+ {
+ return configuration.getJournalFileSize();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public int getJournalMinFiles()
{
- return configuration.getJournalMinFiles();
+ clearIO();
+ try
+ {
+ return configuration.getJournalMinFiles();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public int getJournalCompactMinFiles()
{
- return configuration.getJournalCompactMinFiles();
+ clearIO();
+ try
+ {
+ return configuration.getJournalCompactMinFiles();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public int getJournalCompactPercentage()
{
- return configuration.getJournalCompactPercentage();
+ clearIO();
+ try
+ {
+ return configuration.getJournalCompactPercentage();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public boolean isPersistenceEnabled()
{
- return configuration.isPersistenceEnabled();
+ clearIO();
+ try
+ {
+ return configuration.isPersistenceEnabled();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public String getJournalType()
{
- return configuration.getJournalType().toString();
+ clearIO();
+ try
+ {
+ return configuration.getJournalType().toString();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public String getPagingDirectory()
{
- return configuration.getPagingDirectory();
+ clearIO();
+ try
+ {
+ return configuration.getPagingDirectory();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public int getScheduledThreadPoolMaxSize()
{
- return configuration.getScheduledThreadPoolMaxSize();
+ clearIO();
+ try
+ {
+ return configuration.getScheduledThreadPoolMaxSize();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public int getThreadPoolMaxSize()
{
- return configuration.getThreadPoolMaxSize();
+ clearIO();
+ try
+ {
+ return configuration.getThreadPoolMaxSize();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public long getSecurityInvalidationInterval()
{
- return configuration.getSecurityInvalidationInterval();
+ clearIO();
+ try
+ {
+ return configuration.getSecurityInvalidationInterval();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public boolean isClustered()
{
- return configuration.isClustered();
+ clearIO();
+ try
+ {
+ return configuration.isClustered();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public boolean isCreateBindingsDir()
{
- return configuration.isCreateBindingsDir();
+ clearIO();
+ try
+ {
+ return configuration.isCreateBindingsDir();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public boolean isCreateJournalDir()
{
- return configuration.isCreateJournalDir();
+ clearIO();
+ try
+ {
+ return configuration.isCreateJournalDir();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public boolean isJournalSyncNonTransactional()
{
- return configuration.isJournalSyncNonTransactional();
+ clearIO();
+ try
+ {
+ return configuration.isJournalSyncNonTransactional();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public boolean isJournalSyncTransactional()
{
- return configuration.isJournalSyncTransactional();
+ clearIO();
+ try
+ {
+ return configuration.isJournalSyncTransactional();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public boolean isSecurityEnabled()
{
- return configuration.isSecurityEnabled();
+ clearIO();
+ try
+ {
+ return configuration.isSecurityEnabled();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
- public void deployQueue(final String address, final String name, String filterString) throws Exception
+ public void deployQueue(final String address, final String name, final String filterString) throws Exception
{
- server.deployQueue(new SimpleString(address), new SimpleString(name), new SimpleString(filterString), true, false);
+ clearIO();
+ try
+ {
+ server.deployQueue(new SimpleString(address),
+ new SimpleString(name),
+ new SimpleString(filterString),
+ true,
+ false);
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public void deployQueue(final String address, final String name, final String filterStr, final boolean durable) throws Exception
{
SimpleString filter = filterStr == null ? null : new SimpleString(filterStr);
+ clearIO();
+ try
+ {
- server.deployQueue(new SimpleString(address), new SimpleString(name), filter, durable, false);
+ server.deployQueue(new SimpleString(address), new SimpleString(name), filter, durable, false);
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public void createQueue(final String address, final String name) throws Exception
{
- server.createQueue(new SimpleString(address), new SimpleString(name), null, true, false);
+ clearIO();
+ try
+ {
+ server.createQueue(new SimpleString(address), new SimpleString(name), null, true, false);
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public void createQueue(final String address, final String name, final boolean durable) throws Exception
{
- server.createQueue(new SimpleString(address), new SimpleString(name), null, durable, false);
+ clearIO();
+ try
+ {
+ server.createQueue(new SimpleString(address), new SimpleString(name), null, durable, false);
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public void createQueue(final String address, final String name, final String filterStr, final boolean durable) throws Exception
{
- SimpleString filter = null;
- if (filterStr != null && !filterStr.trim().equals(""))
+ clearIO();
+ try
{
- filter = new SimpleString(filterStr);
+ SimpleString filter = null;
+ if (filterStr != null && !filterStr.trim().equals(""))
+ {
+ filter = new SimpleString(filterStr);
+ }
+
+ server.createQueue(new SimpleString(address), new SimpleString(name), filter, durable, false);
}
-
- server.createQueue(new SimpleString(address), new SimpleString(name), filter, durable, false);
+ finally
+ {
+ blockOnIO();
+ }
}
public String[] getQueueNames()
{
- Object[] queues = server.getManagementService().getResources(QueueControl.class);
- String[] names = new String[queues.length];
- for (int i = 0; i < queues.length; i++)
+ clearIO();
+ try
{
- QueueControl queue = (QueueControl)queues[i];
- names[i] = queue.getName();
+ Object[] queues = server.getManagementService().getResources(QueueControl.class);
+ String[] names = new String[queues.length];
+ for (int i = 0; i < queues.length; i++)
+ {
+ QueueControl queue = (QueueControl)queues[i];
+ names[i] = queue.getName();
+ }
+
+ return names;
}
-
- return names;
+ finally
+ {
+ blockOnIO();
+ }
}
public String[] getAddressNames()
{
- Object[] addresses = server.getManagementService().getResources(AddressControl.class);
- String[] names = new String[addresses.length];
- for (int i = 0; i < addresses.length; i++)
+ clearIO();
+ try
{
- AddressControl address = (AddressControl)addresses[i];
- names[i] = address.getAddress();
+ Object[] addresses = server.getManagementService().getResources(AddressControl.class);
+ String[] names = new String[addresses.length];
+ for (int i = 0; i < addresses.length; i++)
+ {
+ AddressControl address = (AddressControl)addresses[i];
+ names[i] = address.getAddress();
+ }
+
+ return names;
}
-
- return names;
+ finally
+ {
+ blockOnIO();
+ }
}
public void destroyQueue(final String name) throws Exception
{
- SimpleString queueName = new SimpleString(name);
+ clearIO();
+ try
+ {
+ SimpleString queueName = new SimpleString(name);
- server.destroyQueue(queueName, null);
+ server.destroyQueue(queueName, null);
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public int getConnectionCount()
{
- return server.getConnectionCount();
+ clearIO();
+ try
+ {
+ return server.getConnectionCount();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public void enableMessageCounters()
{
- setMessageCounterEnabled(true);
+ clearIO();
+ try
+ {
+ setMessageCounterEnabled(true);
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public void disableMessageCounters()
{
- setMessageCounterEnabled(false);
+ clearIO();
+ try
+ {
+ setMessageCounterEnabled(false);
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public void resetAllMessageCounters()
{
- messageCounterManager.resetAllCounters();
+ clearIO();
+ try
+ {
+ messageCounterManager.resetAllCounters();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public void resetAllMessageCounterHistories()
{
- messageCounterManager.resetAllCounterHistories();
+ clearIO();
+ try
+ {
+ messageCounterManager.resetAllCounterHistories();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public boolean isMessageCounterEnabled()
{
- return configuration.isMessageCounterEnabled();
+ clearIO();
+ try
+ {
+ return configuration.isMessageCounterEnabled();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public synchronized long getMessageCounterSamplePeriod()
{
- return messageCounterManager.getSamplePeriod();
+ clearIO();
+ try
+ {
+ return messageCounterManager.getSamplePeriod();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public synchronized void setMessageCounterSamplePeriod(final long newPeriod)
{
- if (newPeriod < MessageCounterManagerImpl.MIN_SAMPLE_PERIOD)
+ clearIO();
+ try
{
- throw new IllegalArgumentException("Cannot set MessageCounterSamplePeriod < " + MessageCounterManagerImpl.MIN_SAMPLE_PERIOD +
- " ms");
+ if (newPeriod < MessageCounterManagerImpl.MIN_SAMPLE_PERIOD)
+ {
+ throw new IllegalArgumentException("Cannot set MessageCounterSamplePeriod < " + MessageCounterManagerImpl.MIN_SAMPLE_PERIOD +
+ " ms");
+ }
+
+ if (messageCounterManager != null && newPeriod != messageCounterManager.getSamplePeriod())
+ {
+ messageCounterManager.reschedule(newPeriod);
+ }
}
-
- if (messageCounterManager != null && newPeriod != messageCounterManager.getSamplePeriod())
+ finally
{
- messageCounterManager.reschedule(newPeriod);
+ blockOnIO();
}
}
public int getMessageCounterMaxDayCount()
{
- return messageCounterManager.getMaxDayCount();
+ clearIO();
+ try
+ {
+ return messageCounterManager.getMaxDayCount();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public void setMessageCounterMaxDayCount(final int count)
{
- if (count <= 0)
+ clearIO();
+ try
{
- throw new IllegalArgumentException("invalid value: count must be greater than 0");
+ if (count <= 0)
+ {
+ throw new IllegalArgumentException("invalid value: count must be greater than 0");
+ }
+ messageCounterManager.setMaxDayCount(count);
}
- messageCounterManager.setMaxDayCount(count);
+ finally
+ {
+ blockOnIO();
+ }
}
public String[] listPreparedTransactions()
{
- DateFormat dateFormat = DateFormat.getDateTimeInstance(DateFormat.SHORT, DateFormat.MEDIUM);
+ clearIO();
+ try
+ {
+ DateFormat dateFormat = DateFormat.getDateTimeInstance(DateFormat.SHORT, DateFormat.MEDIUM);
- Map<Xid, Long> xids = resourceManager.getPreparedTransactionsWithCreationTime();
- ArrayList<Entry<Xid, Long>> xidsSortedByCreationTime = new ArrayList<Map.Entry<Xid, Long>>(xids.entrySet());
- Collections.sort(xidsSortedByCreationTime, new Comparator<Entry<Xid, Long>>()
- {
- public int compare(Entry<Xid, Long> entry1, Entry<Xid, Long> entry2)
+ Map<Xid, Long> xids = resourceManager.getPreparedTransactionsWithCreationTime();
+ ArrayList<Entry<Xid, Long>> xidsSortedByCreationTime = new ArrayList<Map.Entry<Xid, Long>>(xids.entrySet());
+ Collections.sort(xidsSortedByCreationTime, new Comparator<Entry<Xid, Long>>()
{
- // sort by creation time, oldest first
- return (int)(entry1.getValue() - entry2.getValue());
+ public int compare(final Entry<Xid, Long> entry1, final Entry<Xid, Long> entry2)
+ {
+ // sort by creation time, oldest first
+ return (int)(entry1.getValue() - entry2.getValue());
+ }
+ });
+ String[] s = new String[xidsSortedByCreationTime.size()];
+ int i = 0;
+ for (Map.Entry<Xid, Long> entry : xidsSortedByCreationTime)
+ {
+ Date creation = new Date(entry.getValue());
+ Xid xid = entry.getKey();
+ s[i++] = dateFormat.format(creation) + " base64: " + XidImpl.toBase64String(xid) + " " + xid.toString();
}
- });
- String[] s = new String[xidsSortedByCreationTime.size()];
- int i = 0;
- for (Map.Entry<Xid, Long> entry : xidsSortedByCreationTime)
+ return s;
+ }
+ finally
{
- Date creation = new Date(entry.getValue());
- Xid xid = entry.getKey();
- s[i++] = dateFormat.format(creation) + " base64: " + XidImpl.toBase64String(xid) + " " + xid.toString();
+ blockOnIO();
}
- return s;
}
public String[] listHeuristicCommittedTransactions()
{
- List<Xid> xids = resourceManager.getHeuristicCommittedTransactions();
- String[] s = new String[xids.size()];
- int i = 0;
- for (Xid xid : xids)
+ clearIO();
+ try
{
- s[i++] = XidImpl.toBase64String(xid);
+ List<Xid> xids = resourceManager.getHeuristicCommittedTransactions();
+ String[] s = new String[xids.size()];
+ int i = 0;
+ for (Xid xid : xids)
+ {
+ s[i++] = XidImpl.toBase64String(xid);
+ }
+ return s;
}
- return s;
+ finally
+ {
+ blockOnIO();
+ }
}
public String[] listHeuristicRolledBackTransactions()
{
- List<Xid> xids = resourceManager.getHeuristicRolledbackTransactions();
- String[] s = new String[xids.size()];
- int i = 0;
- for (Xid xid : xids)
+ clearIO();
+ try
{
- s[i++] = XidImpl.toBase64String(xid);
+ List<Xid> xids = resourceManager.getHeuristicRolledbackTransactions();
+ String[] s = new String[xids.size()];
+ int i = 0;
+ for (Xid xid : xids)
+ {
+ s[i++] = XidImpl.toBase64String(xid);
+ }
+ return s;
}
- return s;
+ finally
+ {
+ blockOnIO();
+ }
}
public synchronized boolean commitPreparedTransaction(final String transactionAsBase64) throws Exception
{
- List<Xid> xids = resourceManager.getPreparedTransactions();
+ clearIO();
+ try
+ {
+ List<Xid> xids = resourceManager.getPreparedTransactions();
- for (Xid xid : xids)
- {
- if (XidImpl.toBase64String(xid).equals(transactionAsBase64))
+ for (Xid xid : xids)
{
- Transaction transaction = resourceManager.removeTransaction(xid);
- transaction.commit(false);
- server.getStorageManager().waitOnOperations();
- long recordID = server.getStorageManager().storeHeuristicCompletion(xid, true);
- resourceManager.putHeuristicCompletion(recordID, xid, true);
- return true;
+ if (XidImpl.toBase64String(xid).equals(transactionAsBase64))
+ {
+ Transaction transaction = resourceManager.removeTransaction(xid);
+ transaction.commit(false);
+ long recordID = server.getStorageManager().storeHeuristicCompletion(xid, true);
+ storageManager.waitOnOperations();
+ resourceManager.putHeuristicCompletion(recordID, xid, true);
+ return true;
+ }
}
+ return false;
}
- return false;
+ finally
+ {
+ blockOnIO();
+ }
}
public synchronized boolean rollbackPreparedTransaction(final String transactionAsBase64) throws Exception
{
- List<Xid> xids = resourceManager.getPreparedTransactions();
- for (Xid xid : xids)
+ clearIO();
+ try
{
- if (XidImpl.toBase64String(xid).equals(transactionAsBase64))
+
+ List<Xid> xids = resourceManager.getPreparedTransactions();
+
+ for (Xid xid : xids)
{
- Transaction transaction = resourceManager.removeTransaction(xid);
- transaction.rollback();
- server.getStorageManager().waitOnOperations();
- long recordID = server.getStorageManager().storeHeuristicCompletion(xid, false);
- resourceManager.putHeuristicCompletion(recordID, xid, false);
- return true;
+ if (XidImpl.toBase64String(xid).equals(transactionAsBase64))
+ {
+ Transaction transaction = resourceManager.removeTransaction(xid);
+ transaction.rollback();
+ long recordID = server.getStorageManager().storeHeuristicCompletion(xid, false);
+ server.getStorageManager().waitOnOperations();
+ resourceManager.putHeuristicCompletion(recordID, xid, false);
+ return true;
+ }
}
+ return false;
}
- return false;
+ finally
+ {
+ blockOnIO();
+ }
}
public String[] listRemoteAddresses()
{
- Set<RemotingConnection> connections = remotingService.getConnections();
+ clearIO();
+ try
+ {
+ Set<RemotingConnection> connections = remotingService.getConnections();
- String[] remoteAddresses = new String[connections.size()];
- int i = 0;
- for (RemotingConnection connection : connections)
+ String[] remoteAddresses = new String[connections.size()];
+ int i = 0;
+ for (RemotingConnection connection : connections)
+ {
+ remoteAddresses[i++] = connection.getRemoteAddress();
+ }
+ return remoteAddresses;
+ }
+ finally
{
- remoteAddresses[i++] = connection.getRemoteAddress();
+ blockOnIO();
}
- return remoteAddresses;
+
}
public String[] listRemoteAddresses(final String ipAddress)
{
- Set<RemotingConnection> connections = remotingService.getConnections();
- List<String> remoteConnections = new ArrayList<String>();
- for (RemotingConnection connection : connections)
+ clearIO();
+ try
{
- String remoteAddress = connection.getRemoteAddress();
- if (remoteAddress.contains(ipAddress))
+ Set<RemotingConnection> connections = remotingService.getConnections();
+ List<String> remoteConnections = new ArrayList<String>();
+ for (RemotingConnection connection : connections)
{
- remoteConnections.add(connection.getRemoteAddress());
+ String remoteAddress = connection.getRemoteAddress();
+ if (remoteAddress.contains(ipAddress))
+ {
+ remoteConnections.add(connection.getRemoteAddress());
+ }
}
+ return remoteConnections.toArray(new String[remoteConnections.size()]);
}
- return (String[])remoteConnections.toArray(new String[remoteConnections.size()]);
+ finally
+ {
+ blockOnIO();
+ }
+
}
public synchronized boolean closeConnectionsForAddress(final String ipAddress)
{
- boolean closed = false;
- Set<RemotingConnection> connections = remotingService.getConnections();
- for (RemotingConnection connection : connections)
+ clearIO();
+ try
{
- String remoteAddress = connection.getRemoteAddress();
- if (remoteAddress.contains(ipAddress))
+ boolean closed = false;
+ Set<RemotingConnection> connections = remotingService.getConnections();
+ for (RemotingConnection connection : connections)
{
- remotingService.removeConnection(connection.getID());
- connection.fail(new HornetQException(HornetQException.INTERNAL_ERROR, "connections for " + ipAddress +
- " closed by management"));
- closed = true;
+ String remoteAddress = connection.getRemoteAddress();
+ if (remoteAddress.contains(ipAddress))
+ {
+ remotingService.removeConnection(connection.getID());
+ connection.fail(new HornetQException(HornetQException.INTERNAL_ERROR, "connections for " + ipAddress +
+ " closed by management"));
+ closed = true;
+ }
}
+
+ return closed;
}
+ finally
+ {
+ blockOnIO();
+ }
- return closed;
}
public String[] listConnectionIDs()
{
- Set<RemotingConnection> connections = remotingService.getConnections();
- String[] connectionIDs = new String[connections.size()];
- int i = 0;
- for (RemotingConnection connection : connections)
+ clearIO();
+ try
{
- connectionIDs[i++] = connection.getID().toString();
+ Set<RemotingConnection> connections = remotingService.getConnections();
+ String[] connectionIDs = new String[connections.size()];
+ int i = 0;
+ for (RemotingConnection connection : connections)
+ {
+ connectionIDs[i++] = connection.getID().toString();
+ }
+ return connectionIDs;
}
- return connectionIDs;
+ finally
+ {
+ blockOnIO();
+ }
}
public String[] listSessions(final String connectionID)
{
- List<ServerSession> sessions = server.getSessions(connectionID);
- String[] sessionIDs = new String[sessions.size()];
- int i = 0;
- for (ServerSession serverSession : sessions)
+ clearIO();
+ try
{
- sessionIDs[i++] = serverSession.getName();
+ List<ServerSession> sessions = server.getSessions(connectionID);
+ String[] sessionIDs = new String[sessions.size()];
+ int i = 0;
+ for (ServerSession serverSession : sessions)
+ {
+ sessionIDs[i++] = serverSession.getName();
+ }
+ return sessionIDs;
}
- return sessionIDs;
+ finally
+ {
+ blockOnIO();
+ }
}
public Object[] getConnectors() throws Exception
{
- Collection<TransportConfiguration> connectorConfigurations = configuration.getConnectorConfigurations().values();
+ clearIO();
+ try
+ {
+ Collection<TransportConfiguration> connectorConfigurations = configuration.getConnectorConfigurations()
+ .values();
- Object[] ret = new Object[connectorConfigurations.size()];
+ Object[] ret = new Object[connectorConfigurations.size()];
- int i = 0;
- for (TransportConfiguration config : connectorConfigurations)
- {
- Object[] tc = new Object[3];
+ int i = 0;
+ for (TransportConfiguration config : connectorConfigurations)
+ {
+ Object[] tc = new Object[3];
- tc[0] = config.getName();
- tc[1] = config.getFactoryClassName();
- tc[2] = config.getParams();
+ tc[0] = config.getName();
+ tc[1] = config.getFactoryClassName();
+ tc[2] = config.getParams();
- ret[i++] = tc;
+ ret[i++] = tc;
+ }
+
+ return ret;
}
-
- return ret;
+ finally
+ {
+ blockOnIO();
+ }
}
public String getConnectorsAsJSON() throws Exception
{
- JSONArray array = new JSONArray();
+ clearIO();
+ try
+ {
+ JSONArray array = new JSONArray();
- for (TransportConfiguration config : configuration.getConnectorConfigurations().values())
+ for (TransportConfiguration config : configuration.getConnectorConfigurations().values())
+ {
+ array.put(new JSONObject(config));
+ }
+
+ return array.toString();
+ }
+ finally
{
- array.put(new JSONObject(config));
+ blockOnIO();
}
-
- return array.toString();
}
public void sendQueueInfoToQueue(final String queueName, final String address) throws Exception
{
- postOffice.sendQueueInfoToQueue(new SimpleString(queueName), new SimpleString(address));
- // blocking on IO. Otherwise the method would return before the operation was finished
- server.getStorageManager().waitOnOperations();
+ clearIO();
+ try
+ {
+ postOffice.sendQueueInfoToQueue(new SimpleString(queueName), new SimpleString(address));
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
// NotificationEmitter implementation ----------------------------
@@ -587,19 +1060,43 @@
final NotificationFilter filter,
final Object handback) throws ListenerNotFoundException
{
- broadcaster.removeNotificationListener(listener, filter, handback);
+ clearIO();
+ try
+ {
+ broadcaster.removeNotificationListener(listener, filter, handback);
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public void removeNotificationListener(final NotificationListener listener) throws ListenerNotFoundException
{
- broadcaster.removeNotificationListener(listener);
+ clearIO();
+ try
+ {
+ broadcaster.removeNotificationListener(listener);
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public void addNotificationListener(final NotificationListener listener,
final NotificationFilter filter,
final Object handback) throws IllegalArgumentException
{
- broadcaster.addNotificationListener(listener, filter, handback);
+ clearIO();
+ try
+ {
+ broadcaster.addNotificationListener(listener, filter, handback);
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public MBeanNotificationInfo[] getNotificationInfo()
@@ -621,7 +1118,7 @@
// Private -------------------------------------------------------
- private synchronized void setMessageCounterEnabled(boolean enable)
+ private synchronized void setMessageCounterEnabled(final boolean enable)
{
if (isStarted())
{
Modified: trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java 2009-12-03 23:46:53 UTC (rev 8543)
+++ trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java 2009-12-03 23:54:22 UTC (rev 8544)
@@ -214,6 +214,7 @@
remotingService,
messagingServer,
messageCounterManager,
+ storageManager,
broadcaster);
ObjectName objectName = objectNameBuilder.getHornetQServerObjectName();
registerInJMX(objectName, messagingServerControl);
@@ -232,7 +233,7 @@
public synchronized void registerAddress(final SimpleString address) throws Exception
{
ObjectName objectName = objectNameBuilder.getAddressObjectName(address);
- AddressControlImpl addressControl = new AddressControlImpl(address, postOffice, pagingManager, securityRepository);
+ AddressControlImpl addressControl = new AddressControlImpl(address, postOffice, pagingManager, storageManager, securityRepository);
registerInJMX(objectName, addressControl);
@@ -293,7 +294,7 @@
public synchronized void registerDivert(Divert divert, DivertConfiguration config) throws Exception
{
ObjectName objectName = objectNameBuilder.getDivertObjectName(divert.getUniqueName());
- DivertControl divertControl = new DivertControlImpl(divert, config);
+ DivertControl divertControl = new DivertControlImpl(divert, storageManager, config);
registerInJMX(objectName, new StandardMBean(divertControl, DivertControl.class));
registerInRegistry(ResourceNames.CORE_DIVERT + config.getName(), divertControl);
@@ -313,7 +314,7 @@
public synchronized void registerAcceptor(final Acceptor acceptor, final TransportConfiguration configuration) throws Exception
{
ObjectName objectName = objectNameBuilder.getAcceptorObjectName(configuration.getName());
- AcceptorControl control = new AcceptorControlImpl(acceptor, configuration);
+ AcceptorControl control = new AcceptorControlImpl(acceptor, storageManager, configuration);
registerInJMX(objectName, new StandardMBean(control, AcceptorControl.class));
registerInRegistry(ResourceNames.CORE_ACCEPTOR + configuration.getName(), control);
}
@@ -355,7 +356,7 @@
{
broadcastGroup.setNotificationService(this);
ObjectName objectName = objectNameBuilder.getBroadcastGroupObjectName(configuration.getName());
- BroadcastGroupControl control = new BroadcastGroupControlImpl(broadcastGroup, configuration);
+ BroadcastGroupControl control = new BroadcastGroupControlImpl(broadcastGroup, storageManager, configuration);
registerInJMX(objectName, new StandardMBean(control, BroadcastGroupControl.class));
registerInRegistry(ResourceNames.CORE_BROADCAST_GROUP + configuration.getName(), control);
}
@@ -372,7 +373,7 @@
{
discoveryGroup.setNotificationService(this);
ObjectName objectName = objectNameBuilder.getDiscoveryGroupObjectName(configuration.getName());
- DiscoveryGroupControl control = new DiscoveryGroupControlImpl(discoveryGroup, configuration);
+ DiscoveryGroupControl control = new DiscoveryGroupControlImpl(discoveryGroup, storageManager, configuration);
registerInJMX(objectName, new StandardMBean(control, DiscoveryGroupControl.class));
registerInRegistry(ResourceNames.CORE_DISCOVERY_GROUP + configuration.getName(), control);
}
@@ -388,7 +389,7 @@
{
bridge.setNotificationService(this);
ObjectName objectName = objectNameBuilder.getBridgeObjectName(configuration.getName());
- BridgeControl control = new BridgeControlImpl(bridge, configuration);
+ BridgeControl control = new BridgeControlImpl(bridge, storageManager, configuration);
registerInJMX(objectName, new StandardMBean(control, BridgeControl.class));
registerInRegistry(ResourceNames.CORE_BRIDGE + configuration.getName(), control);
}
@@ -404,7 +405,7 @@
final ClusterConnectionConfiguration configuration) throws Exception
{
ObjectName objectName = objectNameBuilder.getClusterConnectionObjectName(configuration.getName());
- ClusterConnectionControl control = new ClusterConnectionControlImpl(cluster, configuration);
+ ClusterConnectionControl control = new ClusterConnectionControlImpl(cluster, storageManager, configuration);
registerInJMX(objectName, new StandardMBean(control, ClusterConnectionControl.class));
registerInRegistry(ResourceNames.CORE_CLUSTER_CONNECTION + configuration.getName(), control);
}
Modified: trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2009-12-03 23:46:53 UTC (rev 8543)
+++ trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2009-12-03 23:54:22 UTC (rev 8544)
@@ -44,7 +44,7 @@
* @version <tt>$Revision$</tt>
*
*/
-public class QueueControlImpl extends StandardMBean implements QueueControl
+public class QueueControlImpl extends AbstractControl implements QueueControl
{
// Constants -----------------------------------------------------
@@ -60,8 +60,6 @@
private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
- private final StorageManager storageManager;
-
private MessageCounter counter;
// Static --------------------------------------------------------
@@ -84,12 +82,11 @@
final StorageManager storageManager,
final HierarchicalRepository<AddressSettings> addressSettingsRepository) throws Exception
{
- super(QueueControl.class);
+ super(QueueControl.class, storageManager);
this.queue = queue;
this.address = address;
this.postOffice = postOffice;
this.addressSettingsRepository = addressSettingsRepository;
- this.storageManager = storageManager;
}
// Public --------------------------------------------------------
@@ -711,23 +708,5 @@
// Private -------------------------------------------------------
- private void clearIO()
- {
- storageManager.clearContext();
- }
-
- private void blockOnIO()
- {
- try
- {
- storageManager.waitOnOperations();
- storageManager.clearContext();
- }
- catch (Exception e)
- {
- throw new RuntimeException(e.getMessage(), e);
- }
- }
-
// Inner classes -------------------------------------------------
}
Modified: trunk/src/main/org/hornetq/core/persistence/OperationContext.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/OperationContext.java 2009-12-03 23:46:53 UTC (rev 8543)
+++ trunk/src/main/org/hornetq/core/persistence/OperationContext.java 2009-12-03 23:54:22 UTC (rev 8544)
@@ -38,5 +38,9 @@
/** To be called when there are no more operations pending */
void complete();
+
+ void waitCompletion() throws Exception;
+
+ boolean waitCompletion(long timeout) throws Exception;
}
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-12-03 23:46:53 UTC (rev 8543)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-12-03 23:54:22 UTC (rev 8544)
@@ -315,6 +315,11 @@
public void waitOnOperations() throws Exception
{
+ if (!started)
+ {
+ log.warn("Server is stopped");
+ throw new IllegalStateException("Server is stopped");
+ }
waitOnOperations(0);
}
@@ -323,16 +328,14 @@
*/
public void waitOnOperations(final long timeout) throws Exception
{
- SimpleWaitIOCallback waitCallback = new SimpleWaitIOCallback();
- afterCompleteOperations(waitCallback);
- completeOperations();
- if (timeout == 0)
+ if (!started)
{
- waitCallback.waitCompletion();
+ log.warn("Server is stopped");
+ throw new IllegalStateException("Server is stopped");
}
- else if (!waitCallback.waitCompletion(timeout))
+ if (!getContext().waitCompletion(timeout))
{
- throw new IllegalStateException("no response received from replication");
+ throw new HornetQException(HornetQException.IO_ERROR, "Timeout on waiting I/O completion");
}
}
@@ -1542,6 +1545,21 @@
{
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.OperationContext#waitCompletion()
+ */
+ public void waitCompletion()
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.OperationContext#waitCompletion(long)
+ */
+ public boolean waitCompletion(long timeout)
+ {
+ return true;
+ }
+
}
private static class XidEncoding implements EncodingSupport
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2009-12-03 23:46:53 UTC (rev 8543)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2009-12-03 23:54:22 UTC (rev 8544)
@@ -21,6 +21,7 @@
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.journal.IOAsyncTask;
+import org.hornetq.core.journal.impl.SimpleWaitIOCallback;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.persistence.OperationContext;
import org.hornetq.utils.ExecutorFactory;
@@ -210,6 +211,7 @@
}
catch (Throwable e)
{
+ e.printStackTrace();
log.warn("Error on executor's submit");
executorsPending.decrementAndGet();
task.onError(HornetQException.INTERNAL_ERROR, "It wasn't possible to complete IO operation - " + e.getMessage());
@@ -262,4 +264,27 @@
}
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.OperationContext#waitCompletion()
+ */
+ public void waitCompletion() throws Exception
+ {
+ waitCompletion(0);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.OperationContext#waitCompletion(long)
+ */
+ public boolean waitCompletion(long timeout) throws Exception
+ {
+ SimpleWaitIOCallback waitCallback = new SimpleWaitIOCallback();
+ executeOnCompletion(waitCallback);
+ complete();
+ if (timeout == 0)
+ {
+ waitCallback.waitCompletion();
+ }
+ return (waitCallback.waitCompletion(timeout));
+ }
+
}
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-12-03 23:46:53 UTC (rev 8543)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-12-03 23:54:22 UTC (rev 8544)
@@ -306,7 +306,7 @@
{
initialisePart2();
}
-
+
// We start the remoting service here - if the server is a backup remoting service needs to be started
// so it can be initialised by the live node
remotingService.start();
Modified: trunk/tests/src/org/hornetq/tests/integration/client/OrderTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/OrderTest.java 2009-12-03 23:46:53 UTC (rev 8543)
+++ trunk/tests/src/org/hornetq/tests/integration/client/OrderTest.java 2009-12-03 23:54:22 UTC (rev 8544)
@@ -85,7 +85,7 @@
for (int i = 0; i < 100; i++)
{
- ClientMessage msg = session.createClientMessage(i % 2 == 0);
+ ClientMessage msg = session.createClientMessage(i % 2 == 0);
msg.putIntProperty("id", i);
prod.send(msg);
}
@@ -103,7 +103,7 @@
server.stop();
server.start();
}
-
+
session = sf.createSession(true, true);
session.start();
@@ -144,6 +144,86 @@
}
+ public void testOrderOverSessionClosePersistent() throws Exception
+ {
+ doTestOverCancel(true);
+ }
+
+ public void testOrderOverSessionCloseNonPersistent() throws Exception
+ {
+ doTestOverCancel(false);
+ }
+
+ public void doTestOverCancel(final boolean persistent) throws Exception
+ {
+ server = createServer(persistent, true);
+ server.start();
+
+ ClientSessionFactory sf = createNettyFactory();
+
+ sf.setBlockOnNonPersistentSend(false);
+ sf.setBlockOnPersistentSend(false);
+ sf.setBlockOnAcknowledge(false);
+
+ ClientSession session = sf.createSession(true, true, 0);
+
+ int numberOfMessages = 500;
+
+ try
+ {
+ session.createQueue("queue", "queue", true);
+
+ ClientProducer prod = session.createProducer("queue");
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage msg = session.createClientMessage(i % 2 == 0);
+ msg.putIntProperty("id", i);
+ prod.send(msg);
+ }
+
+ session.close();
+
+ for (int i = 0 ; i < numberOfMessages;)
+ {
+ session = sf.createSession();
+
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer("queue");
+
+ int max = i + 10;
+
+ for (;i < max; i++)
+ {
+ ClientMessage msg = consumer.receive(1000);
+
+ msg.acknowledge();
+
+ assertEquals(i, msg.getIntProperty("id").intValue());
+ }
+
+ // Receive a few more messages but don't consume them
+ for (int j = 0 ; j < 10 && i < numberOfMessages; j++)
+ {
+ ClientMessage msg = consumer.receiveImmediate();
+ if (msg == null)
+ {
+ break;
+ }
+ }
+ session.close();
+
+ }
+ }
+ finally
+ {
+ sf.close();
+ session.close();
+ }
+
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/tests/src/org/hornetq/tests/integration/management/ManagementServiceImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/ManagementServiceImplTest.java 2009-12-03 23:46:53 UTC (rev 8543)
+++ trunk/tests/src/org/hornetq/tests/integration/management/ManagementServiceImplTest.java 2009-12-03 23:54:22 UTC (rev 8544)
@@ -26,6 +26,7 @@
import org.hornetq.core.management.QueueControl;
import org.hornetq.core.management.ResourceNames;
import org.hornetq.core.management.impl.ManagementServiceImpl;
+import org.hornetq.core.persistence.impl.nullpm.NullStorageManager;
import org.hornetq.core.server.HornetQ;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
@@ -151,6 +152,7 @@
Configuration conf = new ConfigurationImpl();
conf.setJMXManagementEnabled(false);
ManagementServiceImpl managementService = new ManagementServiceImpl(null, conf);
+ managementService.setStorageManager(new NullStorageManager());
SimpleString address = randomSimpleString();
managementService.registerAddress(address);
Added: trunk/tests/src/org/hornetq/tests/unit/core/persistence/impl/OperationContextUnitTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/persistence/impl/OperationContextUnitTest.java (rev 0)
+++ trunk/tests/src/org/hornetq/tests/unit/core/persistence/impl/OperationContextUnitTest.java 2009-12-03 23:54:22 UTC (rev 8544)
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.unit.core.persistence.impl;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
+import org.hornetq.tests.util.UnitTestCase;
+
+/**
+ * A OperationContextUnitTest
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class OperationContextUnitTest extends UnitTestCase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testCaptureException() throws Exception
+ {
+ ExecutorService executor = Executors.newSingleThreadExecutor();
+ executor.shutdown();
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ final OperationContextImpl impl = new OperationContextImpl(executor)
+ {
+ public void complete()
+ {
+ super.complete();
+ latch.countDown();
+ }
+
+ };
+
+ impl.storeLineUp();
+
+ final AtomicInteger numberOfFailures = new AtomicInteger(0);
+
+ Thread t = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ impl.waitCompletion(5000);
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ numberOfFailures.incrementAndGet();
+ }
+ }
+ };
+
+ t.start();
+
+ // Need to wait complete to be called first or the test would be invalid.
+ // We use a latch instead of forcing a sleep here
+ assertTrue(latch.await(5, TimeUnit.SECONDS));
+
+ impl.done();
+
+ t.join();
+
+ assertEquals(1, numberOfFailures.get());
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
15 years, 1 month
JBoss hornetq SVN: r8543 - trunk/tests/src/org/hornetq/tests/integration/client.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-12-03 18:46:53 -0500 (Thu, 03 Dec 2009)
New Revision: 8543
Modified:
trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
Fixing size for test
Modified: trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2009-12-03 20:20:29 UTC (rev 8542)
+++ trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2009-12-03 23:46:53 UTC (rev 8543)
@@ -1014,7 +1014,7 @@
AddressSettings pagedDestinationB = new AddressSettings();
pagedDestinationB.setPageSizeBytes(2024);
- pagedDestinationB.setMaxSizeBytes(20 * 1024);
+ pagedDestinationB.setMaxSizeBytes(25 * 1024);
addresses.put(PAGED_ADDRESS_B.toString(), pagedDestinationB);
15 years, 1 month
JBoss hornetq SVN: r8542 - in trunk: src/main/org/hornetq/core/persistence/impl/journal and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-12-03 15:20:29 -0500 (Thu, 03 Dec 2009)
New Revision: 8542
Modified:
trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java
Log:
Fix for JMS Tests
Modified: trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2009-12-03 20:10:15 UTC (rev 8541)
+++ trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2009-12-03 20:20:29 UTC (rev 8542)
@@ -59,19 +59,18 @@
private final PostOffice postOffice;
private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
-
+
private final StorageManager storageManager;
private MessageCounter counter;
// Static --------------------------------------------------------
- private static String toJSON(Map<String, Object>[] messages)
+ private static String toJSON(final Map<String, Object>[] messages)
{
JSONArray array = new JSONArray();
- for (int i = 0; i < messages.length; i++)
+ for (Map<String, Object> message : messages)
{
- Map<String, Object> message = messages[i];
array.put(new JSONObject(message));
}
return array.toString();
@@ -95,7 +94,7 @@
// Public --------------------------------------------------------
- public void setMessageCounter(MessageCounter counter)
+ public void setMessageCounter(final MessageCounter counter)
{
this.counter = counter;
}
@@ -104,7 +103,15 @@
public String getName()
{
- return queue.getName().toString();
+ clearIO();
+ try
+ {
+ return queue.getName().toString();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public String getAddress()
@@ -114,123 +121,244 @@
public String getFilter()
{
- Filter filter = queue.getFilter();
+ clearIO();
+ try
+ {
+ Filter filter = queue.getFilter();
- return (filter != null) ? filter.getFilterString().toString() : null;
+ return filter != null ? filter.getFilterString().toString() : null;
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public boolean isDurable()
{
- return queue.isDurable();
+ clearIO();
+ try
+ {
+ return queue.isDurable();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public boolean isTemporary()
{
- return queue.isTemporary();
+ clearIO();
+ try
+ {
+ return queue.isTemporary();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public int getMessageCount()
{
- return queue.getMessageCount();
+ clearIO();
+ try
+ {
+ return queue.getMessageCount();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public int getConsumerCount()
{
- return queue.getConsumerCount();
+ clearIO();
+ try
+ {
+ return queue.getConsumerCount();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public int getDeliveringCount()
{
- return queue.getDeliveringCount();
+ clearIO();
+ try
+ {
+ return queue.getDeliveringCount();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public int getMessagesAdded()
{
- return queue.getMessagesAdded();
+ clearIO();
+ try
+ {
+ return queue.getMessagesAdded();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public long getID()
{
- return queue.getID();
+ clearIO();
+ try
+ {
+ return queue.getID();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public long getScheduledCount()
{
- return queue.getScheduledCount();
+ clearIO();
+ try
+ {
+ return queue.getScheduledCount();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public String getDeadLetterAddress()
{
- AddressSettings addressSettings = addressSettingsRepository.getMatch(address);
+ clearIO();
+ try
+ {
+ AddressSettings addressSettings = addressSettingsRepository.getMatch(address);
- if (addressSettings != null && addressSettings.getDeadLetterAddress() != null)
- {
- return addressSettings.getDeadLetterAddress().toString();
+ if (addressSettings != null && addressSettings.getDeadLetterAddress() != null)
+ {
+ return addressSettings.getDeadLetterAddress().toString();
+ }
+ else
+ {
+ return null;
+ }
}
- else
+ finally
{
- return null;
+ blockOnIO();
}
}
public void setDeadLetterAddress(final String deadLetterAddress) throws Exception
{
- AddressSettings addressSettings = addressSettingsRepository.getMatch(address);
+ clearIO();
+ try
+ {
+ AddressSettings addressSettings = addressSettingsRepository.getMatch(address);
- if (deadLetterAddress != null)
+ if (deadLetterAddress != null)
+ {
+ addressSettings.setDeadLetterAddress(new SimpleString(deadLetterAddress));
+ }
+ }
+ finally
{
- addressSettings.setDeadLetterAddress(new SimpleString(deadLetterAddress));
+ blockOnIO();
}
}
public String getExpiryAddress()
{
- AddressSettings addressSettings = addressSettingsRepository.getMatch(address);
+ clearIO();
+ try
+ {
+ AddressSettings addressSettings = addressSettingsRepository.getMatch(address);
- if (addressSettings != null && addressSettings.getExpiryAddress() != null)
- {
- return addressSettings.getExpiryAddress().toString();
+ if (addressSettings != null && addressSettings.getExpiryAddress() != null)
+ {
+ return addressSettings.getExpiryAddress().toString();
+ }
+ else
+ {
+ return null;
+ }
}
- else
+ finally
{
- return null;
+ blockOnIO();
}
}
public void setExpiryAddress(final String expiryAddress) throws Exception
{
- AddressSettings addressSettings = addressSettingsRepository.getMatch(address);
+ clearIO();
+ try
+ {
+ AddressSettings addressSettings = addressSettingsRepository.getMatch(address);
- SimpleString sExpiryAddress = new SimpleString(expiryAddress);
+ SimpleString sExpiryAddress = new SimpleString(expiryAddress);
- if (expiryAddress != null)
+ if (expiryAddress != null)
+ {
+ addressSettings.setExpiryAddress(sExpiryAddress);
+ }
+
+ queue.setExpiryAddress(sExpiryAddress);
+ }
+ finally
{
- addressSettings.setExpiryAddress(sExpiryAddress);
+ blockOnIO();
}
-
- queue.setExpiryAddress(sExpiryAddress);
}
public Map<String, Object>[] listScheduledMessages() throws Exception
{
- List<MessageReference> refs = queue.getScheduledMessages();
- Map<String, Object>[] messages = new Map[refs.size()];
- int i = 0;
- for (MessageReference ref : refs)
+ clearIO();
+ try
{
- Message message = ref.getMessage();
- messages[i++] = message.toMap();
+ List<MessageReference> refs = queue.getScheduledMessages();
+ Map<String, Object>[] messages = new Map[refs.size()];
+ int i = 0;
+ for (MessageReference ref : refs)
+ {
+ Message message = ref.getMessage();
+ messages[i++] = message.toMap();
+ }
+ return messages;
}
- return messages;
+ finally
+ {
+ blockOnIO();
+ }
}
public String listScheduledMessagesAsJSON() throws Exception
{
- return toJSON(listScheduledMessages());
+ clearIO();
+ try
+ {
+ return toJSON(listScheduledMessages());
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public Map<String, Object>[] listMessages(final String filterStr) throws Exception
{
+ clearIO();
try
{
Filter filter = FilterImpl.createFilter(filterStr);
@@ -248,22 +376,43 @@
{
throw new IllegalStateException(e.getMessage());
}
+ finally
+ {
+ blockOnIO();
+ }
}
- public String listMessagesAsJSON(String filter) throws Exception
+ public String listMessagesAsJSON(final String filter) throws Exception
{
- return toJSON(listMessages(filter));
+ clearIO();
+ try
+ {
+ return toJSON(listMessages(filter));
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public int countMessages(final String filterStr) throws Exception
{
- Filter filter = FilterImpl.createFilter(filterStr);
- List<MessageReference> refs = queue.list(filter);
- return refs.size();
+ clearIO();
+ try
+ {
+ Filter filter = FilterImpl.createFilter(filterStr);
+ List<MessageReference> refs = queue.list(filter);
+ return refs.size();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public boolean removeMessage(final long messageID) throws Exception
{
+ clearIO();
try
{
return queue.deleteReference(messageID);
@@ -272,138 +421,185 @@
{
throw new IllegalStateException(e.getMessage());
}
+ finally
+ {
+ blockOnIO();
+ }
}
public int removeMessages(final String filterStr) throws Exception
{
- Filter filter = FilterImpl.createFilter(filterStr);
-
- int retValue = queue.deleteMatchingReferences(filter);
-
- // Waiting on IO otherwise the operation would return before the operation completed
- storageManager.waitOnOperations();
-
- return retValue;
+ clearIO();
+ try
+ {
+ Filter filter = FilterImpl.createFilter(filterStr);
+
+ return queue.deleteMatchingReferences(filter);
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public boolean expireMessage(final long messageID) throws Exception
{
- boolean retValue =queue.expireReference(messageID);
-
- // Waiting on IO otherwise the operation would return before the operation completed
- storageManager.waitOnOperations();
-
- return retValue;
+ clearIO();
+ try
+ {
+ return queue.expireReference(messageID);
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public int expireMessages(final String filterStr) throws Exception
{
+ clearIO();
try
{
Filter filter = FilterImpl.createFilter(filterStr);
- int retValue = queue.expireReferences(filter);
-
- // Waiting on IO otherwise the operation would return before the operation completed
- storageManager.waitOnOperations();
-
- return retValue;
+ return queue.expireReferences(filter);
}
catch (HornetQException e)
{
throw new IllegalStateException(e.getMessage());
}
+ finally
+ {
+ blockOnIO();
+ }
}
public boolean moveMessage(final long messageID, final String otherQueueName) throws Exception
{
- Binding binding = postOffice.getBinding(new SimpleString(otherQueueName));
+ clearIO();
+ try
+ {
+ Binding binding = postOffice.getBinding(new SimpleString(otherQueueName));
- if (binding == null)
+ if (binding == null)
+ {
+ throw new IllegalArgumentException("No queue found for " + otherQueueName);
+ }
+
+ return queue.moveReference(messageID, binding.getAddress());
+ }
+ finally
{
- throw new IllegalArgumentException("No queue found for " + otherQueueName);
+ blockOnIO();
}
- boolean retValue = queue.moveReference(messageID, binding.getAddress());
-
- // Waiting on IO otherwise the operation would return before the operation completed
- storageManager.waitOnOperations();
-
- return retValue;
}
public int moveMessages(final String filterStr, final String otherQueueName) throws Exception
{
- Filter filter = FilterImpl.createFilter(filterStr);
+ clearIO();
+ try
+ {
+ Filter filter = FilterImpl.createFilter(filterStr);
- Binding binding = postOffice.getBinding(new SimpleString(otherQueueName));
+ Binding binding = postOffice.getBinding(new SimpleString(otherQueueName));
- if (binding == null)
+ if (binding == null)
+ {
+ throw new IllegalArgumentException("No queue found for " + otherQueueName);
+ }
+
+ int retValue = queue.moveReferences(filter, binding.getAddress());
+
+ return retValue;
+ }
+ finally
{
- throw new IllegalArgumentException("No queue found for " + otherQueueName);
+ blockOnIO();
}
- int retValue = queue.moveReferences(filter, binding.getAddress());
-
- // Waiting on IO otherwise the operation would return before the operation completed
- storageManager.waitOnOperations();
-
- return retValue;
-
}
public int sendMessagesToDeadLetterAddress(final String filterStr) throws Exception
{
- Filter filter = FilterImpl.createFilter(filterStr);
+ clearIO();
+ try
+ {
+ Filter filter = FilterImpl.createFilter(filterStr);
- List<MessageReference> refs = queue.list(filter);
+ List<MessageReference> refs = queue.list(filter);
- for (MessageReference ref : refs)
+ for (MessageReference ref : refs)
+ {
+ sendMessageToDeadLetterAddress(ref.getMessage().getMessageID());
+ }
+
+ return refs.size();
+ }
+ finally
{
- sendMessageToDeadLetterAddress(ref.getMessage().getMessageID());
+ blockOnIO();
}
-
- // Waiting on IO otherwise the operation would return before the operation completed
- storageManager.waitOnOperations();
-
- return refs.size();
}
public boolean sendMessageToDeadLetterAddress(final long messageID) throws Exception
{
- boolean retValue = queue.sendMessageToDeadLetterAddress(messageID);
-
- // Waiting on IO otherwise the operation would return before the operation completed
- storageManager.waitOnOperations();
+ clearIO();
+ try
+ {
- return retValue;
+ boolean retValue = queue.sendMessageToDeadLetterAddress(messageID);
+
+ return retValue;
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
- public int changeMessagesPriority(String filterStr, int newPriority) throws Exception
+ public int changeMessagesPriority(final String filterStr, final int newPriority) throws Exception
{
- Filter filter = FilterImpl.createFilter(filterStr);
+ clearIO();
+ try
+ {
+ Filter filter = FilterImpl.createFilter(filterStr);
- List<MessageReference> refs = queue.list(filter);
+ List<MessageReference> refs = queue.list(filter);
- for (MessageReference ref : refs)
+ for (MessageReference ref : refs)
+ {
+ changeMessagePriority(ref.getMessage().getMessageID(), newPriority);
+ }
+
+ return refs.size();
+ }
+ finally
{
- changeMessagePriority(ref.getMessage().getMessageID(), newPriority);
+ blockOnIO();
}
-
- return refs.size();
}
public boolean changeMessagePriority(final long messageID, final int newPriority) throws Exception
{
- if (newPriority < 0 || newPriority > 9)
+ clearIO();
+ try
{
- throw new IllegalArgumentException("invalid newPriority value: " + newPriority +
- ". It must be between 0 and 9 (both included)");
+ if (newPriority < 0 || newPriority > 9)
+ {
+ throw new IllegalArgumentException("invalid newPriority value: " + newPriority +
+ ". It must be between 0 and 9 (both included)");
+ }
+ return queue.changeReferencePriority(messageID, (byte)newPriority);
}
- return queue.changeReferencePriority(messageID, (byte)newPriority);
+ finally
+ {
+ blockOnIO();
+ }
}
public String listMessageCounter()
{
+ clearIO();
try
{
return MessageCounterInfo.toJSon(counter);
@@ -412,41 +608,101 @@
{
throw new IllegalStateException(e);
}
+ finally
+ {
+ blockOnIO();
+ }
}
public void resetMessageCounter()
{
- counter.resetCounter();
+ clearIO();
+ try
+ {
+ counter.resetCounter();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public String listMessageCounterAsHTML()
{
- return MessageCounterHelper.listMessageCounterAsHTML(new MessageCounter[] { counter });
+ clearIO();
+ try
+ {
+ return MessageCounterHelper.listMessageCounterAsHTML(new MessageCounter[] { counter });
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public String listMessageCounterHistory() throws Exception
{
- return MessageCounterHelper.listMessageCounterHistory(counter);
+ clearIO();
+ try
+ {
+ return MessageCounterHelper.listMessageCounterHistory(counter);
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public String listMessageCounterHistoryAsHTML()
{
- return MessageCounterHelper.listMessageCounterHistoryAsHTML(new MessageCounter[] { counter });
+ clearIO();
+ try
+ {
+ return MessageCounterHelper.listMessageCounterHistoryAsHTML(new MessageCounter[] { counter });
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public void pause()
{
- queue.pause();
+ clearIO();
+ try
+ {
+ queue.pause();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public void resume()
{
- queue.resume();
+ clearIO();
+ try
+ {
+ queue.resume();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
public boolean isPaused() throws Exception
{
- return queue.isPaused();
+ clearIO();
+ try
+ {
+ return queue.isPaused();
+ }
+ finally
+ {
+ blockOnIO();
+ }
}
// Package protected ---------------------------------------------
@@ -455,5 +711,23 @@
// Private -------------------------------------------------------
+ private void clearIO()
+ {
+ storageManager.clearContext();
+ }
+
+ private void blockOnIO()
+ {
+ try
+ {
+ storageManager.waitOnOperations();
+ storageManager.clearContext();
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+
// Inner classes -------------------------------------------------
}
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2009-12-03 20:10:15 UTC (rev 8541)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2009-12-03 20:20:29 UTC (rev 8542)
@@ -19,6 +19,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
+import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.persistence.OperationContext;
@@ -194,16 +195,25 @@
private void execute(final IOAsyncTask task)
{
executorsPending.incrementAndGet();
- executor.execute(new Runnable()
+ try
{
- public void run()
+ executor.execute(new Runnable()
{
- // If any IO is done inside the callback, it needs to be done on a new context
- clearContext();
- task.done();
- executorsPending.decrementAndGet();
- }
- });
+ public void run()
+ {
+ // If any IO is done inside the callback, it needs to be done on a new context
+ clearContext();
+ task.done();
+ executorsPending.decrementAndGet();
+ }
+ });
+ }
+ catch (Throwable e)
+ {
+ log.warn("Error on executor's submit");
+ executorsPending.decrementAndGet();
+ task.onError(HornetQException.INTERNAL_ERROR, "It wasn't possible to complete IO operation - " + e.getMessage());
+ }
}
/* (non-Javadoc)
Modified: trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java
===================================================================
--- trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java 2009-12-03 20:10:15 UTC (rev 8541)
+++ trunk/tests/jms-tests/src/org/hornetq/jms/tests/tools/container/LocalTestServer.java 2009-12-03 20:20:29 UTC (rev 8542)
@@ -52,6 +52,7 @@
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.management.ObjectNameBuilder;
+import org.hornetq.core.management.QueueControl;
import org.hornetq.core.management.ResourceNames;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.BindingType;
@@ -62,6 +63,7 @@
import org.hornetq.jms.HornetQQueue;
import org.hornetq.jms.HornetQTopic;
import org.hornetq.jms.server.JMSServerManager;
+import org.hornetq.jms.server.config.TopicConfiguration;
import org.hornetq.jms.server.management.JMSQueueControl;
import org.hornetq.jms.server.management.TopicControl;
import org.hornetq.utils.Pair;
@@ -101,7 +103,7 @@
private int serverIndex;
- HornetQBootstrapServer bootstrap;
+ private HornetQBootstrapServer bootstrap;
// Constructors ---------------------------------------------------------------------------------
@@ -168,6 +170,7 @@
bootstrap.shutDown();
started = false;
unbindAll();
+ bootstrap = null;
return true;
}
@@ -429,15 +432,15 @@
public void removeAllMessages(String destination, boolean isQueue) throws Exception
{
- SimpleString address = HornetQQueue.createAddressFromName(destination);
- if (!isQueue)
+ if (isQueue)
{
- address = HornetQTopic.createAddressFromName(destination);
- }
- Binding binding = getHornetQServer().getPostOffice().getBinding(address);
- if (binding != null && binding.getType() == BindingType.LOCAL_QUEUE)
+ JMSQueueControl queue = (JMSQueueControl)getHornetQServer().getManagementService().getResource(ResourceNames.JMS_QUEUE + destination);
+ queue.removeMessages(null);
+ }
+ else
{
- ((Queue)binding.getBindable()).deleteAllReferences();
+ TopicControl topic = (TopicControl)getHornetQServer().getManagementService().getResource(ResourceNames.JMS_TOPIC + destination);
+ topic.removeMessages(null);
}
}
@@ -472,3 +475,4 @@
// Inner classes --------------------------------------------------------------------------------
}
+
15 years, 1 month
JBoss hornetq SVN: r8541 - trunk/tests/src/org/hornetq/tests/integration/cluster/reattach.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-12-03 15:10:15 -0500 (Thu, 03 Dec 2009)
New Revision: 8541
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java
Log:
reduced number of iterations to 5
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java 2009-12-03 20:09:44 UTC (rev 8540)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java 2009-12-03 20:10:15 UTC (rev 8541)
@@ -1159,7 +1159,7 @@
protected int getNumIterations()
{
- return 10;
+ return 5;
}
@Override
15 years, 1 month
JBoss hornetq SVN: r8540 - in trunk: src/main/org/hornetq/core/server/cluster/impl and 2 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-12-03 15:09:44 -0500 (Thu, 03 Dec 2009)
New Revision: 8540
Modified:
trunk/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java
trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java
Log:
mainly added reset of remote bindings on bridge failure + cluster test base speedup
Modified: trunk/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java 2009-12-03 19:50:45 UTC (rev 8539)
+++ trunk/src/main/org/hornetq/core/server/cluster/MessageFlowRecord.java 2009-12-03 20:09:44 UTC (rev 8540)
@@ -34,5 +34,7 @@
Bridge getBridge();
void close() throws Exception;
+
+ void reset() throws Exception;
}
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2009-12-03 19:50:45 UTC (rev 8539)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2009-12-03 20:09:44 UTC (rev 8540)
@@ -564,6 +564,8 @@
{
if (flowRecord != null)
{
+ flowRecord.reset();
+
if (notifConsumer != null)
{
try
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-12-03 19:50:45 UTC (rev 8539)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java 2009-12-03 20:09:44 UTC (rev 8540)
@@ -492,6 +492,11 @@
clearBindings();
}
+
+ public void reset() throws Exception
+ {
+ clearBindings();
+ }
public void setBridge(final Bridge bridge)
{
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2009-12-03 19:50:45 UTC (rev 8539)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java 2009-12-03 20:09:44 UTC (rev 8540)
@@ -16,6 +16,7 @@
import static org.hornetq.core.remoting.impl.invm.TransportConstants.SERVER_ID_PROP_NAME;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
@@ -122,9 +123,13 @@
final ClientConsumer consumer;
final ClientSession session;
+
+ final int id;
- ConsumerHolder(final ClientConsumer consumer, final ClientSession session)
+ ConsumerHolder(final int id, final ClientConsumer consumer, final ClientSession session)
{
+ this.id = id;
+
this.consumer = consumer;
this.session = session;
@@ -162,11 +167,8 @@
{
messageCount = getMessageCount(po, address);
- // log.info(node + " messageCount " + messageCount);
-
if (messageCount == count)
- {
- // log.info("Waited " + (System.currentTimeMillis() - start));
+ {
return;
}
@@ -174,8 +176,6 @@
}
while (System.currentTimeMillis() - start < WAIT_TIMEOUT);
- // System.out.println(threadDump(" - fired by ClusterTestBase::waitForBindings"));
-
throw new IllegalStateException("Timed out waiting for messages (messageCount = " + messageCount +
", expecting = " +
count);
@@ -250,11 +250,9 @@
}
}
- // log.info(node + " binding count " + bindingCount + " consumer Count " + totConsumers);
if (bindingCount == count && totConsumers == consumerCount)
- {
- // log.info("Waited " + (System.currentTimeMillis() - start));
+ {
return;
}
@@ -360,7 +358,7 @@
session.start();
- consumers[consumerID] = new ConsumerHolder(consumer, session);
+ consumers[consumerID] = new ConsumerHolder(consumerID, consumer, session);
}
catch (Exception e)
{
@@ -799,71 +797,118 @@
*/
protected void verifyReceiveRoundRobinInSomeOrder(int numMessages, int... consumerIDs) throws Exception
{
+ if (numMessages < consumerIDs.length)
+ {
+ throw new IllegalStateException("You must send more messages than consumers specified or the algorithm " +
+ "won't work");
+ }
+
verifyReceiveRoundRobinInSomeOrder(true, numMessages, consumerIDs);
}
+
+ class OrderedConsumerHolder implements Comparable<OrderedConsumerHolder>
+ {
+ ConsumerHolder consumer;
+ int order;
+
+ public int compareTo(OrderedConsumerHolder o)
+ {
+ int thisOrder = this.order;
+ int otherOrder = o.order;
+ return (thisOrder < otherOrder ? -1 : (thisOrder == otherOrder ? 0 : 1));
+ }
+ }
+
+
protected void verifyReceiveRoundRobinInSomeOrder(boolean ack, int numMessages, int... consumerIDs) throws Exception
{
- Map<Integer, Integer> countMap = new HashMap<Integer, Integer>();
+ if (numMessages < consumerIDs.length)
+ {
+ throw new IllegalStateException("not enough messages");
+ }
+
+ // First get one from each consumer to determine the order, then we sort them in this order
- Set<Integer> counts = new HashSet<Integer>();
-
+ List<OrderedConsumerHolder> sorted = new ArrayList<OrderedConsumerHolder>();
+
for (int i = 0; i < consumerIDs.length; i++)
{
ConsumerHolder holder = consumers[consumerIDs[i]];
-
- if (holder == null)
+
+ ClientMessage msg = holder.consumer.receive(10000);
+
+ assertNotNull(msg);
+
+ int count = msg.getIntProperty(COUNT_PROP);
+
+ OrderedConsumerHolder orderedHolder = new OrderedConsumerHolder();
+
+ orderedHolder.consumer = holder;
+ orderedHolder.order = count;
+
+ sorted.add(orderedHolder);
+
+ if (ack)
{
- throw new IllegalArgumentException("No consumer at " + consumerIDs[i]);
+ msg.acknowledge();
}
+ }
+
+ //Now sort them
+
+ Collections.sort(sorted);
+
+ //First verify the first lot received are ok
+
+ int count = 0;
+
+ for (OrderedConsumerHolder holder: sorted)
+ {
+ if (holder.order != count)
+ {
+ throw new IllegalStateException("Out of order");
+ }
+
+ count++;
+ }
+
+ //Now check the rest are in order too
- ClientMessage message;
- do
+ outer: while (count < numMessages)
+ {
+ for (OrderedConsumerHolder holder: sorted)
{
- message = holder.consumer.receive(1000);
+ ClientMessage msg = holder.consumer.consumer.receive(10000);
- if (message != null)
+ assertNotNull(msg);
+
+ int p = msg.getIntProperty(COUNT_PROP);
+
+ if (p != count)
{
- int count = (Integer)message.getObjectProperty(COUNT_PROP);
-
- Integer prevCount = countMap.get(i);
-
- if (prevCount != null)
- {
- assertEquals("consumer " + i + " received unround-robined message (previous was " + prevCount + ")",
- prevCount + consumerIDs.length,
- count);
- }
-
- assertFalse(counts.contains(count));
-
- counts.add(count);
-
- countMap.put(i, count);
-
- if (ack)
- {
- message.acknowledge();
- }
-
- // log.info("consumer " + consumerIDs[i] + " returns " + count);
+ throw new IllegalStateException("Out of order 2");
}
- else
+
+ if (ack)
{
- // log.info("consumer " + consumerIDs[i] +" returns null");
+ msg.acknowledge();
}
+
+ count++;
+
+ if (count == numMessages)
+ {
+ break outer;
+ }
+
}
- while (message != null);
- }
-
- for (int i = 0; i < numMessages; i++)
- {
- assertTrue("did not receive message " + i, counts.contains(i));
- }
+ }
}
-
+
+
protected void verifyReceiveRoundRobinInSomeOrderWithCounts(boolean ack, int[] messageCounts, int... consumerIDs) throws Exception
- {
+ {
List<LinkedList<Integer>> receivedCounts = new ArrayList<LinkedList<Integer>>();
Set<Integer> counts = new HashSet<Integer>();
@@ -940,7 +985,6 @@
assertEquals(messageCounts[i], elem);
- // log.info("got elem " + messageCounts[i] + " at pos " + index);
index++;
@@ -954,6 +998,12 @@
protected void verifyReceiveRoundRobinInSomeOrderNoAck(int numMessages, int... consumerIDs) throws Exception
{
+ if (numMessages < consumerIDs.length)
+ {
+ throw new IllegalStateException("You must send more messages than consumers specified or the algorithm " +
+ "won't work");
+ }
+
verifyReceiveRoundRobinInSomeOrder(false, numMessages, consumerIDs);
}
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java 2009-12-03 19:50:45 UTC (rev 8539)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/distribution/SymmetricClusterTest.java 2009-12-03 20:09:44 UTC (rev 8540)
@@ -202,15 +202,22 @@
public void testRoundRobinMultipleQueues() throws Exception
{
+ log.info("starting");
setupCluster();
+ log.info("setup cluster");
+
startServers();
+ log.info("started servers");
+
setupSessionFactory(0, isNetty());
setupSessionFactory(1, isNetty());
setupSessionFactory(2, isNetty());
setupSessionFactory(3, isNetty());
setupSessionFactory(4, isNetty());
+
+ log.info("Set up session factories");
createQueue(0, "queues.testaddress", "queue0", null, false);
createQueue(1, "queues.testaddress", "queue0", null, false);
@@ -229,6 +236,8 @@
createQueue(2, "queues.testaddress", "queue2", null, false);
createQueue(3, "queues.testaddress", "queue2", null, false);
createQueue(4, "queues.testaddress", "queue2", null, false);
+
+ log.info("created queues");
addConsumer(0, 0, "queue0", null);
addConsumer(1, 1, "queue0", null);
@@ -247,6 +256,8 @@
addConsumer(12, 2, "queue2", null);
addConsumer(13, 3, "queue2", null);
addConsumer(14, 4, "queue2", null);
+
+ log.info("added consumers");
waitForBindings(0, "queues.testaddress", 3, 3, true);
waitForBindings(1, "queues.testaddress", 3, 3, true);
@@ -259,12 +270,24 @@
waitForBindings(2, "queues.testaddress", 12, 12, false);
waitForBindings(3, "queues.testaddress", 12, 12, false);
waitForBindings(4, "queues.testaddress", 12, 12, false);
+
+ log.info("waited for bindings");
send(0, "queues.testaddress", 10, false, null);
+
+ log.info("sent messages");
verifyReceiveRoundRobinInSomeOrder(10, 0, 1, 2, 3, 4);
+
+ log.info("verified 1");
+
verifyReceiveRoundRobinInSomeOrder(10, 5, 6, 7, 8, 9);
+
+ log.info("verified 2");
+
verifyReceiveRoundRobinInSomeOrder(10, 10, 11, 12, 13, 14);
+
+ log.info("verified 3");
}
public void testMultipleNonLoadBalancedQueues() throws Exception
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java 2009-12-03 19:50:45 UTC (rev 8539)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java 2009-12-03 20:09:44 UTC (rev 8540)
@@ -1097,136 +1097,17 @@
*/
protected void doTestL(final ClientSessionFactory sf) throws Exception
{
- final int numSessions = 1000;
+ final int numSessions = 100;
for (int i = 0; i < numSessions; i++)
{
ClientSession session = sf.createSession(false, false, false);
- log.info("Created session " + System.identityHashCode(session));
-
session.close();
-
- log.info("closed session");
}
}
- // Browsers
- // FIXME - this test won't work until we use a proper iterator for browsing a queue.
- // Making a copy of the queue for a browser consumer doesn't work well with replication since
- // When replicating the create consumer (browser) to the backup, when executed on the backup the
- // backup may have different messages in its queue since been added on different threads.
- // So when replicating deliveries they may not be found.
- // https://jira.jboss.org/jira/browse/JBMESSAGING-1433
- // protected void doTestM(final ClientSessionFactory sf, final int threadNum) throws Exception
- // {
- // long start = System.currentTimeMillis();
- //
- // ClientSession sessSend = sf.createSession(false, true, true, false);
- //
- // ClientSession sessConsume = sf.createSession(false, true, true, false);
- //
- // sessConsume.createQueue(ADDRESS, new SimpleString(threadNum + "sub"), null, false, false);
- //
- // final int numMessages = 100;
- //
- // ClientProducer producer = sessSend.createProducer(ADDRESS);
- //
- // sendMessages(sessSend, producer, numMessages, threadNum);
- //
- // ClientConsumer browser = sessConsume.createConsumer(new SimpleString(threadNum + "sub"),
- // null, false, true);
- //
- // Map<Integer, Integer> consumerCounts = new HashMap<Integer, Integer>();
- //
- // for (int i = 0; i < numMessages; i++)
- // {
- // ClientMessage msg = browser.receive(RECEIVE_TIMEOUT);
- //
- // assertNotNull(msg);
- //
- // int tn = (Integer)msg.getProperty(new SimpleString("threadnum"));
- // int cnt = (Integer)msg.getProperty(new SimpleString("count"));
- //
- // Integer c = consumerCounts.get(tn);
- // if (c == null)
- // {
- // c = new Integer(cnt);
- // }
- //
- // if (cnt != c.intValue())
- // {
- // throw new Exception("Invalid count, expected " + c + " got " + cnt);
- // }
- //
- // c++;
- //
- // //Wrap
- // if (c == numMessages)
- // {
- // c = 0;
- // }
- //
- // consumerCounts.put(tn, c);
- //
- // msg.acknowledge();
- // }
- //
- // sessConsume.close();
- //
- // sessConsume = sf.createSession(false, true, true, false);
- //
- // browser = sessConsume.createConsumer(new SimpleString(threadNum + "sub"),
- // null, false, true);
- //
- // //Messages should still be there
- //
- // consumerCounts.clear();
- //
- // for (int i = 0; i < numMessages; i++)
- // {
- // ClientMessage msg = browser.receive(RECEIVE_TIMEOUT);
- //
- // assertNotNull(msg);
- //
- // int tn = (Integer)msg.getProperty(new SimpleString("threadnum"));
- // int cnt = (Integer)msg.getProperty(new SimpleString("count"));
- //
- // Integer c = consumerCounts.get(tn);
- // if (c == null)
- // {
- // c = new Integer(cnt);
- // }
- //
- // if (cnt != c.intValue())
- // {
- // throw new Exception("Invalid count, expected " + c + " got " + cnt);
- // }
- //
- // c++;
- //
- // //Wrap
- // if (c == numMessages)
- // {
- // c = 0;
- // }
- //
- // consumerCounts.put(tn, c);
- //
- // msg.acknowledge();
- // }
- //
- // sessConsume.close();
- //
- // sessSend.deleteQueue(new SimpleString(threadNum + "sub"));
- //
- // sessSend.close();
- //
- // long end = System.currentTimeMillis();
- //
- // log.info("duration " + (end - start));
- // }
-
+
protected void doTestN(final ClientSessionFactory sf, final int threadNum) throws Exception
{
ClientSession sessCreate = sf.createSession(false, true, true);
15 years, 1 month
JBoss hornetq SVN: r8539 - trunk/src/main/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: ataylor
Date: 2009-12-03 14:50:45 -0500 (Thu, 03 Dec 2009)
New Revision: 8539
Modified:
trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java
Log:
tidied exception handling
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java 2009-12-03 19:19:10 UTC (rev 8538)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java 2009-12-03 19:50:45 UTC (rev 8539)
@@ -130,14 +130,13 @@
}
catch (Exception e)
{
- log.error("Failed to create session", e);
-
if (e instanceof HornetQException)
{
response = new HornetQExceptionMessage((HornetQException)e);
}
else
{
+ log.error("Failed to create session", e);
response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
}
}
@@ -155,14 +154,14 @@
}
catch (Exception e)
{
- log.error("Failed to reattach session", e);
-
if (e instanceof HornetQException)
{
response = new HornetQExceptionMessage((HornetQException)e);
}
else
{
+ log.error("Failed to reattach session", e);
+
response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
}
}
@@ -202,14 +201,13 @@
}
catch (Exception e)
{
- log.warn(e.getMessage(), e);
-
if (e instanceof HornetQException)
{
response = new HornetQExceptionMessage((HornetQException)e);
}
else
{
+ log.warn(e.getMessage(), e);
response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
}
}
15 years, 1 month
JBoss hornetq SVN: r8538 - trunk/src/main/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: ataylor
Date: 2009-12-03 14:19:10 -0500 (Thu, 03 Dec 2009)
New Revision: 8538
Modified:
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
Log:
tidied exception handling
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-12-03 19:01:40 UTC (rev 8537)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-12-03 19:19:10 UTC (rev 8538)
@@ -549,14 +549,13 @@
}
catch (Exception e)
{
- log.error("Failed to delete queue", e);
-
if (e instanceof HornetQException)
{
response = new HornetQExceptionMessage((HornetQException)e);
}
else
{
+ log.error("Failed to delete queue", e);
response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
}
}
@@ -604,14 +603,14 @@
}
catch (Exception e)
{
- log.error("Failed to execute queue query", e);
-
if (e instanceof HornetQException)
{
response = new HornetQExceptionMessage((HornetQException)e);
}
else
{
+ log.error("Failed to execute queue query", e);
+
response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
}
}
@@ -1552,8 +1551,6 @@
}
catch (Exception e)
{
- log.error("Failed to send message", e);
-
if (packet.isRequiresResponse())
{
if (e instanceof HornetQException)
@@ -1562,6 +1559,8 @@
}
else
{
+ log.error("Failed to send message", e);
+
response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
}
}
15 years, 1 month
JBoss hornetq SVN: r8537 - trunk/src/main/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: ataylor
Date: 2009-12-03 14:01:40 -0500 (Thu, 03 Dec 2009)
New Revision: 8537
Modified:
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
Log:
tidied exception handling
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-12-03 18:41:20 UTC (rev 8536)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-12-03 19:01:40 UTC (rev 8537)
@@ -441,14 +441,13 @@
}
catch (Exception e)
{
- log.error("Failed to create consumer", e);
-
if (e instanceof HornetQException)
{
response = new HornetQExceptionMessage((HornetQException)e);
}
else
{
+ log.error("Failed to create consumer", e);
response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
}
}
15 years, 1 month