JBoss hornetq SVN: r7975 - trunk/tests/src/org/hornetq/tests/integration/clientcrash.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-09-21 13:18:28 -0400 (Mon, 21 Sep 2009)
New Revision: 7975
Modified:
trunk/tests/src/org/hornetq/tests/integration/clientcrash/ClientCrashTest.java
Log:
fixed typo in comment
Modified: trunk/tests/src/org/hornetq/tests/integration/clientcrash/ClientCrashTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/clientcrash/ClientCrashTest.java 2009-09-21 12:53:21 UTC (rev 7974)
+++ trunk/tests/src/org/hornetq/tests/integration/clientcrash/ClientCrashTest.java 2009-09-21 17:18:28 UTC (rev 7975)
@@ -64,7 +64,7 @@
{
assertActiveConnections(0);
- // spawn a JVM that creates a JMS client, which waits to receive a test
+ // spawn a JVM that creates a Core client, which waits to receive a test
// message
Process p = SpawnedVMSupport.spawnVM(CrashClient.class.getName());
16 years, 3 months
JBoss hornetq SVN: r7974 - trunk/src/main/org/hornetq/ra.
by do-not-reply@jboss.org
Author: ataylor
Date: 2009-09-21 08:53:21 -0400 (Mon, 21 Sep 2009)
New Revision: 7974
Modified:
trunk/src/main/org/hornetq/ra/HornetQRASessionFactoryImpl.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-139 - fix
Modified: trunk/src/main/org/hornetq/ra/HornetQRASessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/hornetq/ra/HornetQRASessionFactoryImpl.java 2009-09-19 15:48:25 UTC (rev 7973)
+++ trunk/src/main/org/hornetq/ra/HornetQRASessionFactoryImpl.java 2009-09-21 12:53:21 UTC (rev 7974)
@@ -996,6 +996,7 @@
info.setUserName(userName);
info.setPassword(password);
info.setClientID(clientID);
+ info.setDefaults(((HornetQResourceAdapter)mcf.getResourceAdapter()).getProperties());
if (trace)
{
16 years, 3 months
JBoss hornetq SVN: r7973 - in trunk: tests/src/org/hornetq/tests/integration/management and 1 other directory.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-09-19 11:48:25 -0400 (Sat, 19 Sep 2009)
New Revision: 7973
Modified:
trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
trunk/tests/src/org/hornetq/tests/integration/management/ManagementControlHelper.java
trunk/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java
trunk/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java
Log:
fixed test
Modified: trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2009-09-18 16:59:05 UTC (rev 7972)
+++ trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2009-09-19 15:48:25 UTC (rev 7973)
@@ -413,19 +413,16 @@
public void pause()
{
- log.info("calling pause");
queue.pause();
}
public void resume()
{
- log.info("calling resume");
queue.resume();
}
public boolean isPaused() throws Exception
{
- log.info("calling isPaused");
return queue.isPaused();
}
Modified: trunk/tests/src/org/hornetq/tests/integration/management/ManagementControlHelper.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/ManagementControlHelper.java 2009-09-18 16:59:05 UTC (rev 7972)
+++ trunk/tests/src/org/hornetq/tests/integration/management/ManagementControlHelper.java 2009-09-19 15:48:25 UTC (rev 7973)
@@ -52,65 +52,57 @@
public static AcceptorControl createAcceptorControl(String name, MBeanServer mbeanServer) throws Exception
{
- return (AcceptorControl)createProxy(ObjectNames.getAcceptorObjectName(name),
- AcceptorControl.class,
- mbeanServer);
+ return (AcceptorControl)createProxy(ObjectNames.getAcceptorObjectName(name), AcceptorControl.class, mbeanServer);
}
public static BroadcastGroupControl createBroadcastGroupControl(String name, MBeanServer mbeanServer) throws Exception
{
return (BroadcastGroupControl)createProxy(ObjectNames.getBroadcastGroupObjectName(name),
- BroadcastGroupControl.class,
- mbeanServer);
+ BroadcastGroupControl.class,
+ mbeanServer);
}
public static DiscoveryGroupControl createDiscoveryGroupControl(String name, MBeanServer mbeanServer) throws Exception
{
return (DiscoveryGroupControl)createProxy(ObjectNames.getDiscoveryGroupObjectName(name),
- DiscoveryGroupControl.class,
- mbeanServer);
+ DiscoveryGroupControl.class,
+ mbeanServer);
}
public static BridgeControl createBridgeControl(String name, MBeanServer mbeanServer) throws Exception
{
- return (BridgeControl)createProxy(ObjectNames.getBridgeObjectName(name),
- BridgeControl.class,
- mbeanServer);
+ return (BridgeControl)createProxy(ObjectNames.getBridgeObjectName(name), BridgeControl.class, mbeanServer);
}
public static DivertControl createDivertControl(String name, MBeanServer mbeanServer) throws Exception
{
return (DivertControl)createProxy(ObjectNames.getDivertObjectName(new SimpleString(name)),
- DivertControl.class,
- mbeanServer);
+ DivertControl.class,
+ mbeanServer);
}
public static ClusterConnectionControl createClusterConnectionControl(String name, MBeanServer mbeanServer) throws Exception
{
return (ClusterConnectionControl)createProxy(ObjectNames.getClusterConnectionObjectName(name),
- ClusterConnectionControl.class,
- mbeanServer);
+ ClusterConnectionControl.class,
+ mbeanServer);
}
public static HornetQServerControl createHornetQServerControl(MBeanServer mbeanServer) throws Exception
{
return (HornetQServerControl)createProxy(ObjectNames.getHornetQServerObjectName(),
- HornetQServerControl.class,
- mbeanServer);
+ HornetQServerControl.class,
+ mbeanServer);
}
public static QueueControl createQueueControl(SimpleString address, SimpleString name, MBeanServer mbeanServer) throws Exception
{
- return (QueueControl)createProxy(ObjectNames.getQueueObjectName(address, name),
- QueueControl.class,
- mbeanServer);
+ return (QueueControl)createProxy(ObjectNames.getQueueObjectName(address, name), QueueControl.class, mbeanServer);
}
public static AddressControl createAddressControl(SimpleString address, MBeanServer mbeanServer) throws Exception
{
- return (AddressControl)createProxy(ObjectNames.getAddressObjectName(address),
- AddressControl.class,
- mbeanServer);
+ return (AddressControl)createProxy(ObjectNames.getAddressObjectName(address), AddressControl.class, mbeanServer);
}
public static JMSQueueControl createJMSQueueControl(Queue queue, MBeanServer mbeanServer) throws Exception
@@ -120,30 +112,26 @@
public static JMSQueueControl createJMSQueueControl(String name, MBeanServer mbeanServer) throws Exception
{
- return (JMSQueueControl)createProxy(ObjectNames.getJMSQueueObjectName(name),
- JMSQueueControl.class,
- mbeanServer);
+ return (JMSQueueControl)createProxy(ObjectNames.getJMSQueueObjectName(name), JMSQueueControl.class, mbeanServer);
}
public static JMSServerControl createJMSServerControl(MBeanServer mbeanServer) throws Exception
{
- return (JMSServerControl)createProxy(ObjectNames.getJMSServerObjectName(),
- JMSServerControl.class,
- mbeanServer);
+ return (JMSServerControl)createProxy(ObjectNames.getJMSServerObjectName(), JMSServerControl.class, mbeanServer);
}
public static ConnectionFactoryControl createConnectionFactoryControl(String name, MBeanServer mbeanServer) throws Exception
{
return (ConnectionFactoryControl)createProxy(ObjectNames.getConnectionFactoryObjectName(name),
- ConnectionFactoryControl.class,
- mbeanServer);
+ ConnectionFactoryControl.class,
+ mbeanServer);
}
public static TopicControl createTopicControl(Topic topic, MBeanServer mbeanServer) throws Exception
{
return (TopicControl)createProxy(ObjectNames.getJMSTopicObjectName(topic.getTopicName()),
- TopicControl.class,
- mbeanServer);
+ TopicControl.class,
+ mbeanServer);
}
// Constructors --------------------------------------------------
Modified: trunk/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java 2009-09-18 16:59:05 UTC (rev 7972)
+++ trunk/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java 2009-09-19 15:48:25 UTC (rev 7973)
@@ -1316,6 +1316,7 @@
protected QueueControl createManagementControl(SimpleString address, SimpleString queue) throws Exception
{
QueueControl queueControl = createQueueControl(address, queue, mbeanServer);
+
return queueControl;
}
Modified: trunk/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java 2009-09-18 16:59:05 UTC (rev 7972)
+++ trunk/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java 2009-09-19 15:48:25 UTC (rev 7973)
@@ -259,7 +259,7 @@
public void resume() throws Exception
{
- proxy.invokeOperation("pause");
+ proxy.invokeOperation("resume");
}
public boolean isPaused() throws Exception
16 years, 3 months
JBoss hornetq SVN: r7972 - in trunk: src/main/org/hornetq/core/management/impl and 6 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-09-18 12:59:05 -0400 (Fri, 18 Sep 2009)
New Revision: 7972
Modified:
trunk/src/main/org/hornetq/core/management/QueueControl.java
trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
trunk/src/main/org/hornetq/core/management/jmx/impl/ReplicationAwareQueueControlWrapper.java
trunk/src/main/org/hornetq/core/server/Queue.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java
trunk/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-82 merged patch
Modified: trunk/src/main/org/hornetq/core/management/QueueControl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/QueueControl.java 2009-09-18 16:11:12 UTC (rev 7971)
+++ trunk/src/main/org/hornetq/core/management/QueueControl.java 2009-09-18 16:59:05 UTC (rev 7972)
@@ -124,4 +124,13 @@
@Operation(desc = "List the message counters history HTML", impact = INFO)
String listMessageCounterHistoryAsHTML() throws Exception;
+
+ @Operation(desc = "Pauses the Queue", impact = ACTION)
+ void pause() throws Exception;
+
+ @Operation(desc = "Resumes delivery of queued messages and gets the queue out of paused state.", impact = ACTION)
+ void resume() throws Exception;
+
+ @Operation(desc = "Inspects if the queue is paused", impact = INFO)
+ boolean isPaused() throws Exception;
}
Modified: trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2009-09-18 16:11:12 UTC (rev 7971)
+++ trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2009-09-18 16:59:05 UTC (rev 7972)
@@ -19,6 +19,7 @@
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.filter.impl.FilterImpl;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.management.MessageCounterInfo;
import org.hornetq.core.management.QueueControl;
import org.hornetq.core.message.Message;
@@ -42,8 +43,9 @@
*/
public class QueueControlImpl implements QueueControl
{
-
// Constants -----------------------------------------------------
+
+ private static final Logger log = Logger.getLogger(QueueControlImpl.class);
// Attributes ----------------------------------------------------
@@ -65,11 +67,11 @@
for (int i = 0; i < messages.length; i++)
{
Map<String, Object> message = messages[i];
- array.put(new JSONObject(message));
+ array.put(new JSONObject(message));
}
return array.toString();
}
-
+
/**
* Returns null if the string is null or empty
*/
@@ -78,7 +80,8 @@
if (filterStr == null || filterStr.trim().length() == 0)
{
return null;
- } else
+ }
+ else
{
return new FilterImpl(new SimpleString(filterStr));
}
@@ -87,9 +90,9 @@
// Constructors --------------------------------------------------
public QueueControlImpl(final Queue queue,
- final String address,
- final PostOffice postOffice,
- final HierarchicalRepository<AddressSettings> addressSettingsRepository)
+ final String address,
+ final PostOffice postOffice,
+ final HierarchicalRepository<AddressSettings> addressSettingsRepository)
{
this.queue = queue;
this.address = address;
@@ -211,12 +214,12 @@
AddressSettings addressSettings = addressSettingsRepository.getMatch(address);
SimpleString sExpiryAddress = new SimpleString(expiryAddress);
-
+
if (expiryAddress != null)
{
addressSettings.setExpiryAddress(sExpiryAddress);
}
-
+
queue.setExpiryAddress(sExpiryAddress);
}
@@ -232,14 +235,14 @@
}
return messages;
}
-
+
public String listScheduledMessagesAsJSON() throws Exception
{
return toJSON(listScheduledMessages());
}
public Map<String, Object>[] listMessages(final String filterStr) throws Exception
- {
+ {
try
{
Filter filter = createFilter(filterStr);
@@ -258,7 +261,7 @@
throw new IllegalStateException(e.getMessage());
}
}
-
+
public String listMessagesAsJSON(String filter) throws Exception
{
return toJSON(listMessages(filter));
@@ -408,6 +411,24 @@
return MessageCounterHelper.listMessageCounterHistoryAsHTML(new MessageCounter[] { counter });
}
+ public void pause()
+ {
+ log.info("calling pause");
+ queue.pause();
+ }
+
+ public void resume()
+ {
+ log.info("calling resume");
+ queue.resume();
+ }
+
+ public boolean isPaused() throws Exception
+ {
+ log.info("calling isPaused");
+ return queue.isPaused();
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/management/jmx/impl/ReplicationAwareQueueControlWrapper.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/jmx/impl/ReplicationAwareQueueControlWrapper.java 2009-09-18 16:11:12 UTC (rev 7971)
+++ trunk/src/main/org/hornetq/core/management/jmx/impl/ReplicationAwareQueueControlWrapper.java 2009-09-18 16:59:05 UTC (rev 7972)
@@ -246,6 +246,21 @@
info.getNotifications());
}
+ public boolean isPaused() throws Exception
+ {
+ return localQueueControl.isPaused();
+ }
+
+ public void pause() throws Exception
+ {
+ localQueueControl.pause();
+ }
+
+ public void resume() throws Exception
+ {
+ localQueueControl.resume();
+ }
+
// Public --------------------------------------------------------
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/Queue.java 2009-09-18 16:11:12 UTC (rev 7971)
+++ trunk/src/main/org/hornetq/core/server/Queue.java 2009-09-18 16:59:05 UTC (rev 7972)
@@ -150,4 +150,22 @@
Iterator<MessageReference> iterator();
void setExpiryAddress(SimpleString expiryAddress);
+ /**
+ * Pauses the queue. It will receive messages but won't give them to the consumers until resumed.
+ * If a queue is paused, pausing it again will only throw a warning.
+ * To check if a queue is paused, invoke <i>isPaused()</i>
+ */
+ void pause();
+ /**
+ * Resumes the delivery of message for the queue.
+ * If a queue is resumed, resuming it again will only throw a warning.
+ * To check if a queue is resumed, invoke <i>isPaused()</i>
+ */
+ void resume();
+ /**
+ *
+ * @return true if paused, false otherwise.
+ */
+ boolean isPaused();
+
}
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-09-18 16:11:12 UTC (rev 7971)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-09-18 16:59:05 UTC (rev 7972)
@@ -113,6 +113,8 @@
private final AtomicBoolean waitingToDeliver = new AtomicBoolean(false);
+ private boolean paused;
+
private final Runnable deliverRunner = new DeliverRunner();
private final PagingManager pagingManager;
@@ -146,7 +148,7 @@
private final Map<Consumer, Iterator<MessageReference>> iterators = new HashMap<Consumer, Iterator<MessageReference>>();
private ConcurrentMap<SimpleString, Consumer> groups = new ConcurrentHashMap<SimpleString, Consumer>();
-
+
private volatile SimpleString expiryAddress;
public QueueImpl(final long persistenceID,
@@ -192,7 +194,7 @@
direct = true;
scheduledDeliveryHandler = new ScheduledDeliveryHandlerImpl(scheduledExecutor);
-
+
if (addressSettingsRepository != null)
{
expiryAddress = addressSettingsRepository.getMatch(address.toString()).getExpiryAddress();
@@ -202,7 +204,7 @@
expiryAddress = null;
}
}
-
+
// Bindable implementation -------------------------------------------------------------------------------------
public SimpleString getRoutingName()
@@ -748,21 +750,21 @@
messageReferences.addFirst(reference, reference.getMessage().getPriority());
}
}
- }
+ }
public void expire(final MessageReference ref) throws Exception
- {
+ {
log.info("expiring ref " + this.expiryAddress);
if (expiryAddress != null)
{
- move(expiryAddress, ref, true);
+ move(expiryAddress, ref, true);
}
else
- {
+ {
acknowledge(ref);
}
}
-
+
public void setExpiryAddress(final SimpleString expiryAddress)
{
this.expiryAddress = expiryAddress;
@@ -1289,7 +1291,7 @@
// with the live node. Instead, when we replicate the delivery we remove
// the ref from the queue
- if (backup)
+ if (backup || paused)
{
return;
}
@@ -1307,11 +1309,11 @@
Iterator<MessageReference> iterator = null;
- //TODO - this needs to be optimised!! Creating too much stuff on an inner loop
+ // TODO - this needs to be optimised!! Creating too much stuff on an inner loop
int totalConsumers = distributionPolicy.getConsumerCount();
Set<Consumer> busyConsumers = new HashSet<Consumer>();
Set<Consumer> nullReferences = new HashSet<Consumer>();
-
+
while (true)
{
consumer = distributionPolicy.getNextConsumer();
@@ -1331,7 +1333,7 @@
else
{
reference = null;
-
+
if (consumer.getFilter() != null)
{
// we have iterated on the whole queue for
@@ -1344,7 +1346,7 @@
if (reference == null)
{
- nullReferences.add(consumer);
+ nullReferences.add(consumer);
if (nullReferences.size() + busyConsumers.size() == totalConsumers)
{
startDepaging();
@@ -1358,10 +1360,10 @@
else
{
nullReferences.remove(consumer);
-
+
if (reference.getMessage().isExpired())
{
- //We expire messages on the server too
+ // We expire messages on the server too
if (iterator == null)
{
messageReferences.removeFirst();
@@ -1370,9 +1372,9 @@
{
iterator.remove();
}
-
+
referenceHandled();
-
+
try
{
expire(reference);
@@ -1381,7 +1383,7 @@
{
log.error("Failed to expire ref", e);
}
-
+
continue;
}
}
@@ -1447,7 +1449,7 @@
boolean add = false;
- if (direct && !backup)
+ if (direct && !backup && !paused)
{
// Deliver directly
@@ -1667,7 +1669,7 @@
if (message.decrementRefCount() == 0 && store != null)
{
- store.addSize(-ref.getMessage().getMemoryEstimate());
+ store.addSize(-ref.getMessage().getMemoryEstimate());
}
}
@@ -1732,6 +1734,7 @@
{
public void run()
{
+
// Must be set to false *before* executing to avoid race
waitingToDeliver.set(false);
@@ -1887,4 +1890,24 @@
}
}
}
+
+ public synchronized void pause()
+ {
+ paused = true;
+
+ log.info("Paused is now " + paused);
+ }
+
+ public synchronized void resume()
+ {
+ paused = false;
+
+ deliver();
+ }
+
+ public synchronized boolean isPaused()
+ {
+ log.info("return ispaused " + paused);
+ return paused;
+ }
}
Modified: trunk/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java 2009-09-18 16:11:12 UTC (rev 7971)
+++ trunk/tests/src/org/hornetq/tests/integration/management/QueueControlTest.java 2009-09-18 16:59:05 UTC (rev 7972)
@@ -32,6 +32,7 @@
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.management.DayCounterInfo;
import org.hornetq.core.management.HornetQServerControl;
import org.hornetq.core.management.MessageCounterInfo;
@@ -307,7 +308,7 @@
session.deleteQueue(queue);
}
-
+
public void testListScheduledMessagesAsJSON() throws Exception
{
long delay = 2000;
@@ -369,7 +370,7 @@
consumer.close();
session.deleteQueue(queue);
}
-
+
public void testListMessagesAsJSONWithNullFilter() throws Exception
{
SimpleString address = randomSimpleString();
@@ -384,7 +385,7 @@
message.putIntProperty(new SimpleString("key"), intValue);
producer.send(message);
- String jsonString = queueControl.listMessagesAsJSON(null);
+ String jsonString = queueControl.listMessagesAsJSON(null);
assertNotNull(jsonString);
JSONArray array = new JSONArray(jsonString);
assertEquals(1, array.length());
@@ -392,7 +393,7 @@
consumeMessages(1, session, queue);
- jsonString = queueControl.listMessagesAsJSON(null);
+ jsonString = queueControl.listMessagesAsJSON(null);
assertNotNull(jsonString);
array = new JSONArray(jsonString);
assertEquals(0, array.length());
@@ -455,7 +456,7 @@
session.deleteQueue(queue);
}
-
+
public void testListMessagesWithEmptyFilter() throws Exception
{
SimpleString address = randomSimpleString();
@@ -478,7 +479,7 @@
session.deleteQueue(queue);
}
-
+
public void testListMessagesAsJSONWithFilter() throws Exception
{
SimpleString key = new SimpleString("key");
@@ -515,7 +516,7 @@
session.deleteQueue(queue);
}
-
+
/**
* <ol>
* <li>send a message to queue</li>
@@ -636,8 +637,7 @@
assertEquals(2, queueControl.getMessageCount());
// moved matching messages to otherQueue
- int movedMatchedMessagesCount = queueControl.moveMessages(key + " =" + matchingValue,
- otherQueue.toString());
+ int movedMatchedMessagesCount = queueControl.moveMessages(key + " =" + matchingValue, otherQueue.toString());
assertEquals(1, movedMatchedMessagesCount);
assertEquals(1, queueControl.getMessageCount());
@@ -682,7 +682,7 @@
assertEquals(0, otherQueueControl.getMessageCount());
// the message IDs are set on the server
- Map<String, Object>[] messages = queueControl.listMessages(null);
+ Map<String, Object>[] messages = queueControl.listMessages(null);
assertEquals(2, messages.length);
long messageID = (Long)messages[0].get("messageID");
@@ -714,7 +714,7 @@
assertEquals(1, queueControl.getMessageCount());
// the message IDs are set on the server
- Map<String, Object>[] messages = queueControl.listMessages(null);
+ Map<String, Object>[] messages = queueControl.listMessages(null);
assertEquals(1, messages.length);
long messageID = (Long)messages[0].get("messageID");
@@ -805,7 +805,7 @@
int removedMatchedMessagesCount = queueControl.removeMessages(null);
assertEquals(2, removedMatchedMessagesCount);
assertEquals(0, queueControl.getMessageCount());
-
+
session.deleteQueue(queue);
}
@@ -828,11 +828,10 @@
int removedMatchedMessagesCount = queueControl.removeMessages("");
assertEquals(2, removedMatchedMessagesCount);
assertEquals(0, queueControl.getMessageCount());
-
+
session.deleteQueue(queue);
}
-
-
+
public void testRemoveMessage() throws Exception
{
SimpleString address = randomSimpleString();
@@ -849,7 +848,7 @@
assertEquals(2, queueControl.getMessageCount());
// the message IDs are set on the server
- Map<String, Object>[] messages = queueControl.listMessages(null);
+ Map<String, Object>[] messages = queueControl.listMessages(null);
assertEquals(2, messages.length);
long messageID = (Long)messages[0].get("messageID");
@@ -958,7 +957,7 @@
assertEquals(0, expiryQueueControl.getMessageCount());
// the message IDs are set on the server
- Map<String, Object>[] messages = queueControl.listMessages(null);
+ Map<String, Object>[] messages = queueControl.listMessages(null);
assertEquals(1, messages.length);
long messageID = (Long)messages[0].get("messageID");
@@ -996,7 +995,7 @@
assertEquals(2, queueControl.getMessageCount());
// the message IDs are set on the server
- Map<String, Object>[] messages = queueControl.listMessages(null);
+ Map<String, Object>[] messages = queueControl.listMessages(null);
assertEquals(2, messages.length);
long messageID = (Long)messages[0].get("messageID");
@@ -1015,7 +1014,7 @@
consumeMessages(1, session, deadLetterQueue);
session.deleteQueue(queue);
- session.deleteQueue(deadLetterQueue);
+ session.deleteQueue(deadLetterQueue);
}
public void testChangeMessagePriority() throws Exception
@@ -1037,7 +1036,7 @@
assertEquals(1, queueControl.getMessageCount());
// the message IDs are set on the server
- Map<String, Object>[] messages = queueControl.listMessages(null);
+ Map<String, Object>[] messages = queueControl.listMessages(null);
assertEquals(1, messages.length);
long messageID = (Long)messages[0].get("messageID");
@@ -1070,7 +1069,7 @@
assertEquals(1, queueControl.getMessageCount());
// the message IDs are set on the server
- Map<String, Object>[] messages = queueControl.listMessages(null);
+ Map<String, Object>[] messages = queueControl.listMessages(null);
assertEquals(1, messages.length);
long messageID = (Long)messages[0].get("messageID");
@@ -1099,14 +1098,14 @@
session.createQueue(address, queue, null, false);
QueueControl queueControl = createManagementControl(address, queue);
-
+
HornetQServerControl serverControl = createHornetQServerControl(mbeanServer);
serverControl.enableMessageCounters();
serverControl.setMessageCounterSamplePeriod(MessageCounterManagerImpl.MIN_SAMPLE_PERIOD);
String jsonString = queueControl.listMessageCounter();
MessageCounterInfo info = MessageCounterInfo.fromJSON(jsonString);
-
+
assertEquals(0, info.getDepth());
assertEquals(0, info.getCount());
@@ -1133,7 +1132,7 @@
consumeMessages(2, session, queue);
- Thread.sleep(MessageCounterManagerImpl.MIN_SAMPLE_PERIOD * 2);
+ Thread.sleep(MessageCounterManagerImpl.MIN_SAMPLE_PERIOD * 2);
jsonString = queueControl.listMessageCounter();
info = MessageCounterInfo.fromJSON(jsonString);
assertEquals(0, info.getDepth());
@@ -1143,7 +1142,7 @@
session.deleteQueue(queue);
}
-
+
public void testResetMessageCounter() throws Exception
{
SimpleString address = randomSimpleString();
@@ -1151,7 +1150,7 @@
session.createQueue(address, queue, null, false);
QueueControl queueControl = createManagementControl(address, queue);
-
+
HornetQServerControl serverControl = createHornetQServerControl(mbeanServer);
serverControl.enableMessageCounters();
serverControl.setMessageCounterSamplePeriod(MessageCounterManagerImpl.MIN_SAMPLE_PERIOD);
@@ -1175,7 +1174,7 @@
consumeMessages(1, session, queue);
- Thread.sleep(MessageCounterManagerImpl.MIN_SAMPLE_PERIOD * 2);
+ Thread.sleep(MessageCounterManagerImpl.MIN_SAMPLE_PERIOD * 2);
jsonString = queueControl.listMessageCounter();
info = MessageCounterInfo.fromJSON(jsonString);
assertEquals(0, info.getDepth());
@@ -1184,18 +1183,18 @@
assertEquals(0, info.getCountDelta());
queueControl.resetMessageCounter();
-
- Thread.sleep(MessageCounterManagerImpl.MIN_SAMPLE_PERIOD * 2);
+
+ Thread.sleep(MessageCounterManagerImpl.MIN_SAMPLE_PERIOD * 2);
jsonString = queueControl.listMessageCounter();
info = MessageCounterInfo.fromJSON(jsonString);
assertEquals(0, info.getDepth());
assertEquals(0, info.getDepthDelta());
assertEquals(0, info.getCount());
assertEquals(0, info.getCountDelta());
-
+
session.deleteQueue(queue);
}
-
+
public void testListMessageCounterAsHTML() throws Exception
{
SimpleString address = randomSimpleString();
@@ -1203,10 +1202,10 @@
session.createQueue(address, queue, null, false);
QueueControl queueControl = createManagementControl(address, queue);
-
+
String history = queueControl.listMessageCounterAsHTML();
assertNotNull(history);
-
+
session.deleteQueue(queue);
}
@@ -1218,7 +1217,7 @@
session.createQueue(address, queue, null, false);
QueueControl queueControl = createManagementControl(address, queue);
-
+
HornetQServerControl serverControl = createHornetQServerControl(mbeanServer);
serverControl.enableMessageCounters();
serverControl.setMessageCounterSamplePeriod(counterPeriod);
@@ -1238,7 +1237,7 @@
session.createQueue(address, queue, null, false);
QueueControl queueControl = createManagementControl(address, queue);
-
+
HornetQServerControl serverControl = createHornetQServerControl(mbeanServer);
serverControl.enableMessageCounters();
serverControl.setMessageCounterSamplePeriod(counterPeriod);
@@ -1247,8 +1246,36 @@
assertNotNull(history);
session.deleteQueue(queue);
+
}
-
+
+ public void testPauseAndResume()
+ {
+ long counterPeriod = 1000;
+ SimpleString address = randomSimpleString();
+ SimpleString queue = randomSimpleString();
+
+ try
+ {
+ session.createQueue(address, queue, null, false);
+ QueueControl queueControl = createManagementControl(address, queue);
+
+ HornetQServerControl serverControl = createHornetQServerControl(mbeanServer);
+ serverControl.enableMessageCounters();
+ serverControl.setMessageCounterSamplePeriod(counterPeriod);
+ assertFalse(queueControl.isPaused());
+ queueControl.pause();
+ assertTrue(queueControl.isPaused());
+ queueControl.resume();
+ assertFalse(queueControl.isPaused());
+ }
+ catch (Exception e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -1278,9 +1305,9 @@
session.close();
server.stop();
-
+
session = null;
-
+
server = null;
super.tearDown();
Modified: trunk/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java 2009-09-18 16:11:12 UTC (rev 7971)
+++ trunk/tests/src/org/hornetq/tests/integration/management/QueueControlUsingCoreTest.java 2009-09-18 16:59:05 UTC (rev 7972)
@@ -252,6 +252,21 @@
proxy.invokeOperation("setExpiryAddress", expiryAddres);
}
+ public void pause() throws Exception
+ {
+ proxy.invokeOperation("pause");
+ }
+
+ public void resume() throws Exception
+ {
+ proxy.invokeOperation("pause");
+ }
+
+ public boolean isPaused() throws Exception
+ {
+ return (Boolean)proxy.invokeOperation("isPaused");
+ }
+
};
}
Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2009-09-18 16:11:12 UTC (rev 7971)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2009-09-18 16:59:05 UTC (rev 7972)
@@ -517,4 +517,31 @@
{
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#isPaused()
+ */
+ public boolean isPaused()
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#pause()
+ */
+ public void pause()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#resume()
+ */
+ public void resume()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
}
\ No newline at end of file
Modified: trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java 2009-09-18 16:11:12 UTC (rev 7971)
+++ trunk/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java 2009-09-18 16:59:05 UTC (rev 7972)
@@ -13,7 +13,6 @@
package org.hornetq.tests.unit.core.server.impl;
-
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
@@ -76,7 +75,6 @@
assertEquals(name, queue.getName());
}
-
public void testDurable()
{
Queue queue = new QueueImpl(1, address1, queue1, null, false, false, scheduledExecutor, null, null, null);
@@ -194,7 +192,7 @@
}
- public void testSimpleDirectDelivery() throws Exception
+ public void testSimpleDirectDelivery() throws Exception
{
Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
@@ -222,7 +220,7 @@
assertRefListsIdenticalRefs(refs, consumer.getReferences());
}
- public void testSimpleNonDirectDelivery() throws Exception
+ public void testSimpleNonDirectDelivery() throws Exception
{
Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
@@ -243,7 +241,7 @@
assertEquals(0, queue.getScheduledCount());
assertEquals(0, queue.getDeliveringCount());
- //Now add a consumer
+ // Now add a consumer
FakeConsumer consumer = new FakeConsumer();
queue.addConsumer(consumer);
@@ -260,7 +258,7 @@
assertEquals(numMessages, queue.getDeliveringCount());
}
- public void testBusyConsumer() throws Exception
+ public void testBusyConsumer() throws Exception
{
Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
@@ -425,10 +423,18 @@
assertRefListsIdenticalRefs(allRefs, consumer.getReferences());
}
-
public void testChangeConsumersAndDeliver() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, new FakePostOffice(), null, null);
+ Queue queue = new QueueImpl(1,
+ address1,
+ queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ new FakePostOffice(),
+ null,
+ null);
final int numMessages = 10;
@@ -609,7 +615,7 @@
}
catch (IllegalStateException e)
{
- //Ok
+ // Ok
}
}
@@ -623,7 +629,7 @@
List<MessageReference> refs = new ArrayList<MessageReference>();
- //Test first with queueing
+ // Test first with queueing
for (int i = 0; i < numMessages; i++)
{
@@ -713,7 +719,7 @@
{
MessageReference ref = generateReference(queue, i);
- ref.getMessage().setPriority((byte) i);
+ ref.getMessage().setPriority((byte)i);
refs.add(ref);
@@ -728,7 +734,7 @@
List<MessageReference> receivedRefs = consumer.getReferences();
- //Should be in reverse order
+ // Should be in reverse order
assertEquals(refs.size(), receivedRefs.size());
@@ -737,8 +743,8 @@
assertEquals(refs.get(i), receivedRefs.get(9 - i));
}
- //But if we send more - since we are now in direct mode - the order will be the send order
- //since the refs don't get queued
+ // But if we send more - since we are now in direct mode - the order will be the send order
+ // since the refs don't get queued
consumer.clearReferences();
@@ -748,7 +754,7 @@
{
MessageReference ref = generateReference(queue, i);
- ref.getMessage().setPriority((byte) i);
+ ref.getMessage().setPriority((byte)i);
refs.add(ref);
@@ -839,7 +845,16 @@
public void testConsumeWithFiltersAddAndRemoveConsumer() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, new FakePostOffice(), null, null);
+ Queue queue = new QueueImpl(1,
+ address1,
+ queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ new FakePostOffice(),
+ null,
+ null);
Filter filter = new FakeFilter("fruit", "orange");
@@ -863,7 +878,6 @@
refs.add(ref2);
-
assertEquals(2, queue.getMessageCount());
assertEquals(1, consumer.getReferences().size());
@@ -880,7 +894,6 @@
queue.deliverNow();
-
refs.clear();
consumer.clearReferences();
@@ -1020,7 +1033,6 @@
assertEquals(0, queue.getScheduledCount());
assertEquals(10, queue.getDeliveringCount());
-
for (int i = numMessages * 2; i < numMessages * 3; i++)
{
MessageReference ref = generateReference(queue, i);
@@ -1038,12 +1050,20 @@
assertEquals(20, queue.getDeliveringCount());
}
-
// Private ------------------------------------------------------------------------------
private void testConsumerWithFilters(boolean direct) throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, new FakePostOffice(), null, null);
+ Queue queue = new QueueImpl(1,
+ address1,
+ queue1,
+ null,
+ false,
+ true,
+ scheduledExecutor,
+ new FakePostOffice(),
+ null,
+ null);
Filter filter = new FakeFilter("fruit", "orange");
@@ -1139,11 +1159,11 @@
queue.addFirst(messageReference);
queue.addLast(messageReference2);
queue.addFirst(messageReference3);
-
+
assertEquals(0, consumer.getReferences().size());
queue.addConsumer(consumer);
queue.deliverNow();
-
+
assertEquals(3, consumer.getReferences().size());
assertEquals(messageReference3, consumer.getReferences().get(0));
assertEquals(messageReference, consumer.getReferences().get(1));
@@ -1162,7 +1182,6 @@
assertEquals(queue.getMessagesAdded(), 3);
}
-
public void testGetReference() throws Exception
{
Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
@@ -1189,16 +1208,120 @@
}
+ /**
+ * Test the paused and resumed states with async deliveries.
+ * @throws Exception
+ */
+ public void testPauseAndResumeWithAsync() throws Exception
+ {
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
+
+ // pauses the queue
+ queue.pause();
+ final int numMessages = 10;
+
+ List<MessageReference> refs = new ArrayList<MessageReference>();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ MessageReference ref = generateReference(queue, i);
+
+ refs.add(ref);
+
+ queue.addLast(ref);
+ }
+ // even as this queue is paused, it will receive the messages anyway
+ assertEquals(10, queue.getMessageCount());
+ assertEquals(0, queue.getScheduledCount());
+ assertEquals(0, queue.getDeliveringCount());
+
+ // Now add a consumer
+ FakeConsumer consumer = new FakeConsumer();
+
+ queue.addConsumer(consumer);
+
+ assertTrue(consumer.getReferences().isEmpty());
+ assertEquals(10, queue.getMessageCount());
+ assertEquals(0, queue.getScheduledCount());
+ // explicit order of delivery
+ queue.deliverNow();
+ // As the queue is paused, even an explicit order of delivery will not work.
+ assertEquals(0, consumer.getReferences().size());
+ assertEquals(numMessages, queue.getMessageCount());
+ assertEquals(0, queue.getScheduledCount());
+ assertEquals(0, queue.getDeliveringCount());
+ // resuming work
+ queue.resume();
+
+ // after resuming the delivery begins.
+ assertRefListsIdenticalRefs(refs, consumer.getReferences());
+ assertEquals(numMessages, queue.getMessageCount());
+ assertEquals(0, queue.getScheduledCount());
+ assertEquals(numMessages, queue.getDeliveringCount());
+
+ }
+
+ /**
+ * Test the paused and resumed states with direct deliveries.
+ * @throws Exception
+ */
+
+ public void testPauseAndResumeWithDirect() throws Exception
+ {
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, scheduledExecutor, null, null, null);
+
+ // Now add a consumer
+ FakeConsumer consumer = new FakeConsumer();
+
+ queue.addConsumer(consumer);
+
+ // brings to queue to paused state
+ queue.pause();
+
+ final int numMessages = 10;
+
+ List<MessageReference> refs = new ArrayList<MessageReference>();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ MessageReference ref = generateReference(queue, i);
+ refs.add(ref);
+ queue.addLast(ref);
+ }
+
+ // the queue even if it's paused will receive the message but won't forward
+ // directly to the consumer until resumed.
+ assertEquals(numMessages, queue.getMessageCount());
+ assertEquals(0, queue.getScheduledCount());
+ assertEquals(0, queue.getDeliveringCount());
+ assertTrue(consumer.getReferences().isEmpty());
+
+ // brings the queue to resumed state.
+ queue.resume();
+ // resuming delivery of messages
+ assertRefListsIdenticalRefs(refs, consumer.getReferences());
+ assertEquals(numMessages, queue.getMessageCount());
+ assertEquals(numMessages, queue.getDeliveringCount());
+
+ }
+
class AddtoQueueRunner implements Runnable
{
Queue queue;
+
MessageReference messageReference;
+
boolean added = false;
+
CountDownLatch countDownLatch;
+
boolean first;
- public AddtoQueueRunner(boolean first, Queue queue, MessageReference messageReference, CountDownLatch countDownLatch)
+ public AddtoQueueRunner(boolean first,
+ Queue queue,
+ MessageReference messageReference,
+ CountDownLatch countDownLatch)
{
this.queue = queue;
this.messageReference = messageReference;
@@ -1226,7 +1349,7 @@
Consumer consumer;
public List<Consumer> getConsumers()
- {
+ {
return null;
}
16 years, 3 months
JBoss hornetq SVN: r7971 - trunk/src/main/org/hornetq/jms/bridge/impl.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-09-18 12:11:12 -0400 (Fri, 18 Sep 2009)
New Revision: 7971
Modified:
trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
Log:
HORNETQ-27: Race condition in JMS Bridge between enlisting the XAResource in the TX and message delivery
* make sure the SoureReceiver thread is joined when the bridge is stopped
Modified: trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java 2009-09-18 14:02:22 UTC (rev 7970)
+++ trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java 2009-09-18 16:11:12 UTC (rev 7971)
@@ -135,6 +135,8 @@
private Thread checkerThread;
+ private Thread sourceReceiver;
+
private long batchExpiryTime;
private boolean paused;
@@ -276,8 +278,8 @@
if (trace) { log.trace("Started time checker thread"); }
}
- Thread receiver = new SourceReceiver();
- receiver.start();
+ sourceReceiver = new SourceReceiver();
+ sourceReceiver.start();
if (trace) { log.trace("Started " + this); }
}
@@ -308,6 +310,11 @@
{
checkerThread.interrupt();
}
+
+ if (sourceReceiver != null)
+ {
+ sourceReceiver.interrupt();
+ }
}
//This must be outside sync block
@@ -320,6 +327,16 @@
if (trace) { log.trace("Checker thread has finished"); }
}
+ //This must be outside sync block
+ if (sourceReceiver != null)
+ {
+ if (trace) { log.trace("Waiting for source receiver thread to finish");}
+
+ sourceReceiver.join();
+
+ if (trace) { log.trace("Source receiver thread has finished"); }
+ }
+
if (tx != null)
{
//Terminate any transaction
@@ -1426,11 +1443,11 @@
@Override
public void run()
{
- while(isStarted())
+ while(started)
{
synchronized (lock)
{
- if (isPaused() || failed)
+ if (paused || failed)
{
try
{
16 years, 3 months
JBoss hornetq SVN: r7970 - in trunk: src/main/org/hornetq/core/config and 4 other directories.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-09-18 10:02:22 -0400 (Fri, 18 Sep 2009)
New Revision: 7970
Modified:
trunk/src/config/common/schema/hornetq-configuration.xsd
trunk/src/main/org/hornetq/core/config/Configuration.java
trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/src/main/org/hornetq/core/server/impl/MemoryManagerImpl.java
trunk/tests/config/ConfigurationTest-full-config.xml
trunk/tests/src/org/hornetq/tests/unit/core/config/impl/ConfigurationImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java
Log:
HORNETQ-138: Memory Manager parameters should be configurable
* Parameters are retrieved using the Configuration interface
* added memory-warning-threshold & memory-measure-interval to XML config
Modified: trunk/src/config/common/schema/hornetq-configuration.xsd
===================================================================
--- trunk/src/config/common/schema/hornetq-configuration.xsd 2009-09-18 12:17:47 UTC (rev 7969)
+++ trunk/src/config/common/schema/hornetq-configuration.xsd 2009-09-18 14:02:22 UTC (rev 7970)
@@ -166,6 +166,10 @@
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="server-dump-interval" type="xsd:long">
</xsd:element>
+ <xsd:element maxOccurs="1" minOccurs="0" name="memory-warning-threshold" type="xsd:int">
+ </xsd:element>
+ <xsd:element maxOccurs="1" minOccurs="0" name="memory-measure-interval" type="xsd:long">
+ </xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="large-messages-directory" type="xsd:string">
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" name="security-settings">
Modified: trunk/src/main/org/hornetq/core/config/Configuration.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/Configuration.java 2009-09-18 12:17:47 UTC (rev 7969)
+++ trunk/src/main/org/hornetq/core/config/Configuration.java 2009-09-18 14:02:22 UTC (rev 7970)
@@ -243,6 +243,14 @@
void setServerDumpInterval(long interval);
+ int getMemoryWarningThreshold();
+
+ void setMemoryWarningThreshold(int memoryWarningThreshold);
+
+ long getMemoryMeasureInterval();
+
+ void setMemoryMeasureInterval(long memoryMeasureInterval);
+
// Paging Properties --------------------------------------------------------------------
String getPagingDirectory();
@@ -288,4 +296,5 @@
void setMessageExpiryThreadPriority(int messageExpiryThreadPriority);
+
}
Modified: trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java 2009-09-18 12:17:47 UTC (rev 7969)
+++ trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java 2009-09-18 14:02:22 UTC (rev 7970)
@@ -158,6 +158,10 @@
public static final long DEFAULT_SERVER_DUMP_INTERVAL = -1;
+ public static final int DEFAULT_MEMORY_WARNING_THRESHOLD = 25;
+
+ public static final long DEFAULT_MEMORY_MEASURE_INTERVAL = 3000; // in milliseconds
+
// Attributes -----------------------------------------------------------------------------
protected boolean clustered = DEFAULT_CLUSTERED;
@@ -282,6 +286,11 @@
protected long serverDumpInterval = DEFAULT_SERVER_DUMP_INTERVAL;
+ // percentage of free memory which triggers warning from the memory manager
+ protected int memoryWarningThreshold = DEFAULT_MEMORY_WARNING_THRESHOLD;
+
+ protected long memoryMeasureInterval = DEFAULT_MEMORY_MEASURE_INTERVAL;
+
// Public -------------------------------------------------------------------------
public void start() throws Exception
@@ -1051,5 +1060,24 @@
{
this.serverDumpInterval = intervalInMilliseconds;
}
+
+ public int getMemoryWarningThreshold()
+ {
+ return memoryWarningThreshold ;
+ }
+
+ public void setMemoryWarningThreshold(int memoryWarningThreshold)
+ {
+ this.memoryWarningThreshold = memoryWarningThreshold;
+ }
+ public long getMemoryMeasureInterval()
+ {
+ return memoryMeasureInterval ;
+ }
+
+ public void setMemoryMeasureInterval(long memoryMeasureInterval)
+ {
+ this.memoryMeasureInterval = memoryMeasureInterval;
+ }
}
Modified: trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java 2009-09-18 12:17:47 UTC (rev 7969)
+++ trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java 2009-09-18 14:02:22 UTC (rev 7970)
@@ -326,6 +326,10 @@
serverDumpInterval = getLong(e, "server-dump-interval", serverDumpInterval, MINUS_ONE_OR_GT_ZERO); // in milliseconds
+ memoryWarningThreshold = getInteger(e, "memory-warning-threshold", memoryWarningThreshold, PERCENTAGE);
+
+ memoryMeasureInterval = getLong(e, "memory-measure-interval", memoryMeasureInterval, GT_ZERO); // in milliseconds
+
started = true;
}
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-09-18 12:17:47 UTC (rev 7969)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-09-18 14:02:22 UTC (rev 7970)
@@ -907,7 +907,7 @@
scheduledPool,
managementConnectorID);
- memoryManager = new MemoryManagerImpl();
+ memoryManager = new MemoryManagerImpl(configuration.getMemoryWarningThreshold(), configuration.getMemoryMeasureInterval());
memoryManager.start();
}
Modified: trunk/src/main/org/hornetq/core/server/impl/MemoryManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/MemoryManagerImpl.java 2009-09-18 12:17:47 UTC (rev 7969)
+++ trunk/src/main/org/hornetq/core/server/impl/MemoryManagerImpl.java 2009-09-18 14:02:22 UTC (rev 7970)
@@ -29,17 +29,11 @@
{
private static final Logger log = Logger.getLogger(MemoryManagerImpl.class);
- private static final long DEFAULT_MEASURE_INTERVAL = 3000;
-
- private static final int DEFAULT_FREE_MEMORY_PERCENT = 25;
-
private Runtime runtime;
- //TODO Should be configurable
private long measureInterval;
- //TODO Should be configurable
- private int freeMemoryPercent;
+ private int memoryWarningThreshold;
private volatile boolean started;
@@ -47,13 +41,13 @@
private volatile boolean low;
- public MemoryManagerImpl()
+ public MemoryManagerImpl(int memoryWarningThreshold, long measureInterval)
{
runtime = Runtime.getRuntime();
- this.measureInterval = DEFAULT_MEASURE_INTERVAL;
+ this.measureInterval = measureInterval;
- this.freeMemoryPercent = DEFAULT_FREE_MEMORY_PERCENT;
+ this.memoryWarningThreshold = memoryWarningThreshold;
}
public boolean isMemoryLow()
@@ -69,7 +63,7 @@
public synchronized void start()
{
log.debug("Starting MemoryManager with MEASURE_INTERVAL: " + measureInterval
- + " FREE_MEMORY_PERCENT: " + freeMemoryPercent);
+ + " FREE_MEMORY_PERCENT: " + memoryWarningThreshold);
if (started)
{
@@ -151,9 +145,9 @@
log.debug(info);
}
- if (availableMemoryPercent <= freeMemoryPercent)
+ if (availableMemoryPercent <= memoryWarningThreshold)
{
- log.warn("Less than " + freeMemoryPercent + "%\n"
+ log.warn("Less than " + memoryWarningThreshold + "%\n"
+ info +
"\nYou are in danger of running out of RAM. Have you set paging parameters " +
"on your addresses? (See user manual \"Paging\" chapter)");
Modified: trunk/tests/config/ConfigurationTest-full-config.xml
===================================================================
--- trunk/tests/config/ConfigurationTest-full-config.xml 2009-09-18 12:17:47 UTC (rev 7969)
+++ trunk/tests/config/ConfigurationTest-full-config.xml 2009-09-18 14:02:22 UTC (rev 7970)
@@ -46,6 +46,8 @@
<journal-min-files>100</journal-min-files>
<journal-max-aio>56546</journal-max-aio>
<large-messages-directory>largemessagesdir</large-messages-directory>
+ <memory-warning-threshold>95</memory-warning-threshold>
+ <memory-measure-interval>54321</memory-measure-interval>
<remoting-interceptors>
<class-name>org.hornetq.tests.unit.core.config.impl.TestInterceptor1</class-name>
Modified: trunk/tests/src/org/hornetq/tests/unit/core/config/impl/ConfigurationImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/config/impl/ConfigurationImplTest.java 2009-09-18 12:17:47 UTC (rev 7969)
+++ trunk/tests/src/org/hornetq/tests/unit/core/config/impl/ConfigurationImplTest.java 2009-09-18 14:02:22 UTC (rev 7970)
@@ -91,7 +91,9 @@
assertEquals(ConfigurationImpl.DEFAULT_MANAGEMENT_REQUEST_TIMEOUT, conf.getManagementRequestTimeout());
assertEquals(ConfigurationImpl.DEFAULT_ID_CACHE_SIZE, conf.getIDCacheSize());
assertEquals(ConfigurationImpl.DEFAULT_PERSIST_ID_CACHE, conf.isPersistIDCache());
-
+ assertEquals(ConfigurationImpl.DEFAULT_SERVER_DUMP_INTERVAL, conf.getServerDumpInterval());
+ assertEquals(ConfigurationImpl.DEFAULT_MEMORY_WARNING_THRESHOLD, conf.getMemoryWarningThreshold());
+ assertEquals(ConfigurationImpl.DEFAULT_MEMORY_MEASURE_INTERVAL, conf.getMemoryMeasureInterval());
}
public void testSetGetAttributes()
Modified: trunk/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java 2009-09-18 12:17:47 UTC (rev 7969)
+++ trunk/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java 2009-09-18 14:02:22 UTC (rev 7970)
@@ -79,6 +79,7 @@
assertEquals(33, conf.getJournalCompactPercentage());
assertEquals(56546, conf.getJournalMaxAIO());
assertEquals("largemessagesdir", conf.getLargeMessagesDirectory());
+ assertEquals(95, conf.getMemoryWarningThreshold());
assertEquals(2, conf.getInterceptorClassNames().size());
assertTrue(conf.getInterceptorClassNames().contains("org.hornetq.tests.unit.core.config.impl.TestInterceptor1"));
16 years, 3 months
JBoss hornetq SVN: r7969 - trunk/src/main/org/hornetq/jms/bridge/impl.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-09-18 08:17:47 -0400 (Fri, 18 Sep 2009)
New Revision: 7969
Modified:
trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
Log:
HORNETQ-27: Race condition in JMS Bridge between enlisting the XAResource in the TX and message delivery
* replace the source's MessageListener by polling the source Consumer in a thread to ensure there is no
race condition between tx resource enlistment and message delivery
Modified: trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java 2009-09-17 22:02:17 UTC (rev 7968)
+++ trunk/src/main/org/hornetq/jms/bridge/impl/JMSBridgeImpl.java 2009-09-18 12:17:47 UTC (rev 7969)
@@ -13,13 +13,12 @@
package org.hornetq.jms.bridge.impl;
+import java.lang.reflect.Method;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
-import java.lang.reflect.Method;
-import java.lang.reflect.InvocationTargetException;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
@@ -48,7 +47,6 @@
import org.hornetq.jms.bridge.QualityOfServiceMode;
import org.hornetq.jms.client.HornetQMessage;
import org.hornetq.jms.client.HornetQSession;
-import org.jboss.tm.TransactionManagerLocator;
/**
*
@@ -278,6 +276,9 @@
if (trace) { log.trace("Started time checker thread"); }
}
+ Thread receiver = new SourceReceiver();
+ receiver.start();
+
if (trace) { log.trace("Started " + this); }
}
else
@@ -285,6 +286,7 @@
log.warn("Failed to start bridge");
handleFailureOnStartup();
}
+
}
public synchronized void stop() throws Exception
@@ -1027,8 +1029,6 @@
}
targetProducer = sess.createProducer(null);
-
- sourceConsumer.setMessageListener(new SourceListener());
return true;
}
@@ -1414,6 +1414,79 @@
// Inner classes ---------------------------------------------------------------
+
+ /**
+ * We use a Thread which polls the sourceDestination instead of a MessageListener
+ * to ensure that message delivery does not happen concurrently with
+ * transaction enlistment of the XAResource (see HORNETQ-27)
+ *
+ */
+ private final class SourceReceiver extends Thread
+ {
+ @Override
+ public void run()
+ {
+ while(isStarted())
+ {
+ synchronized (lock)
+ {
+ if (isPaused() || failed)
+ {
+ try
+ {
+ lock.wait(500);
+ }
+ catch (InterruptedException e)
+ {
+ if (trace) { log.trace(this + " thread was interrupted"); }
+ }
+ continue;
+ }
+
+ Message msg = null;
+ try
+ {
+ msg = sourceConsumer.receive(1000);
+ }
+ catch (JMSException jmse)
+ {
+ if (trace) { log.trace(this + " exception while receiving a message", jmse); }
+ }
+
+ if (msg == null)
+ {
+ try
+ {
+ lock.wait(500);
+ }
+ catch (InterruptedException e)
+ {
+ if (trace) { log.trace(this + " thread was interrupted"); }
+ }
+ continue;
+ }
+
+ if (trace) { log.trace(this + " received message " + msg); }
+
+ messages.add(msg);
+
+ batchExpiryTime = System.currentTimeMillis() + maxBatchTime;
+
+ if (trace) { log.trace(this + " rescheduled batchExpiryTime to " + batchExpiryTime); }
+
+ if (maxBatchSize != -1 && messages.size() >= maxBatchSize)
+ {
+ if (trace) { log.trace(this + " maxBatchSize has been reached so sending batch"); }
+
+ sendBatch();
+
+ if (trace) { log.trace(this + " sent batch"); }
+ }
+ }
+ }
+ }
+ }
+
private class FailureHandler implements Runnable
{
/**
@@ -1575,40 +1648,6 @@
}
}
- private class SourceListener implements MessageListener
- {
- public void onMessage(Message msg)
- {
- synchronized (lock)
- {
- if (failed)
- {
- //Ignore the message
- if (trace) { log.trace("JMSBridge has failed so ignoring message"); }
-
- return;
- }
-
- if (trace) { log.trace(this + " received message " + msg); }
-
- messages.add(msg);
-
- batchExpiryTime = System.currentTimeMillis() + maxBatchTime;
-
- if (trace) { log.trace(this + " rescheduled batchExpiryTime to " + batchExpiryTime); }
-
- if (maxBatchSize != -1 && messages.size() >= maxBatchSize)
- {
- if (trace) { log.trace(this + " maxBatchSize has been reached so sending batch"); }
-
- sendBatch();
-
- if (trace) { log.trace(this + " sent batch"); }
- }
- }
- }
- }
-
private class BridgeExceptionListener implements ExceptionListener
{
public void onException(JMSException e)
16 years, 3 months
JBoss hornetq SVN: r7968 - in trunk: src/main/org/hornetq/core/management/impl and 3 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-09-17 18:02:17 -0400 (Thu, 17 Sep 2009)
New Revision: 7968
Modified:
trunk/src/main/org/hornetq/core/management/HornetQServerControl.java
trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
trunk/src/main/org/hornetq/core/management/jmx/impl/ReplicationAwareHornetQServerControlWrapper.java
trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java
trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java
trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/QueueDeployerTest.java
Log:
Adding properties to ServerControl
Modified: trunk/src/main/org/hornetq/core/management/HornetQServerControl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/HornetQServerControl.java 2009-09-17 14:08:00 UTC (rev 7967)
+++ trunk/src/main/org/hornetq/core/management/HornetQServerControl.java 2009-09-17 22:02:17 UTC (rev 7968)
@@ -60,6 +60,12 @@
int getJournalMinFiles();
int getJournalMaxAIO();
+
+ int getJournalCompactMinFiles();
+
+ int getJournalCompactPercentage();
+
+ boolean isPersistenceEnabled();
boolean isCreateBindingsDir();
Modified: trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2009-09-17 14:08:00 UTC (rev 7967)
+++ trunk/src/main/org/hornetq/core/management/impl/HornetQServerControlImpl.java 2009-09-17 22:02:17 UTC (rev 7968)
@@ -171,7 +171,22 @@
{
return configuration.getJournalMinFiles();
}
+
+ public int getJournalCompactMinFiles()
+ {
+ return configuration.getJournalCompactMinFiles();
+ }
+ public int getJournalCompactPercentage()
+ {
+ return configuration.getJournalCompactPercentage();
+ }
+
+ public boolean isPersistenceEnabled()
+ {
+ return configuration.isPersistenceEnabled();
+ }
+
public String getJournalType()
{
return configuration.getJournalType().toString();
@@ -656,4 +671,5 @@
{
return configuration.isWildcardRoutingEnabled();
}
+
}
Modified: trunk/src/main/org/hornetq/core/management/jmx/impl/ReplicationAwareHornetQServerControlWrapper.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/jmx/impl/ReplicationAwareHornetQServerControlWrapper.java 2009-09-17 14:08:00 UTC (rev 7967)
+++ trunk/src/main/org/hornetq/core/management/jmx/impl/ReplicationAwareHornetQServerControlWrapper.java 2009-09-17 22:02:17 UTC (rev 7968)
@@ -43,7 +43,7 @@
// Constructors --------------------------------------------------
public ReplicationAwareHornetQServerControlWrapper(final HornetQServerControlImpl localControl,
- final ReplicationOperationInvoker replicationInvoker) throws Exception
+ final ReplicationOperationInvoker replicationInvoker) throws Exception
{
super(ResourceNames.CORE_SERVER, HornetQServerControl.class, replicationInvoker);
@@ -71,7 +71,7 @@
{
return localControl.getConnectionCount();
}
-
+
public String[] getInterceptorClassNames()
{
return localControl.getInterceptorClassNames();
@@ -221,17 +221,17 @@
{
return localControl.getConnectors();
}
-
+
public String getConnectorsAsJSON() throws Exception
{
return localControl.getConnectorsAsJSON();
}
-
+
public String[] getAddressNames()
{
return localControl.getAddressNames();
}
-
+
public String[] getQueueNames()
{
return localControl.getQueueNames();
@@ -392,6 +392,21 @@
return localControl.isWildcardRoutingEnabled();
}
+ public int getJournalCompactMinFiles()
+ {
+ return localControl.getJournalCompactMinFiles();
+ }
+
+ public int getJournalCompactPercentage()
+ {
+ return localControl.getJournalCompactPercentage();
+ }
+
+ public boolean isPersistenceEnabled()
+ {
+ return localControl.isPersistenceEnabled();
+ }
+
// StandardMBean overrides ---------------------------------------
@Override
Modified: trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java 2009-09-17 14:08:00 UTC (rev 7967)
+++ trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlTest.java 2009-09-17 22:02:17 UTC (rev 7968)
@@ -119,6 +119,9 @@
assertEquals(conf.getTransactionTimeoutScanPeriod(), serverControl.getTransactionTimeoutScanPeriod());
assertEquals(conf.getMessageExpiryScanPeriod(), serverControl.getMessageExpiryScanPeriod());
assertEquals(conf.getMessageExpiryThreadPriority(), serverControl.getMessageExpiryThreadPriority());
+ assertEquals(conf.getJournalCompactMinFiles(), serverControl.getJournalCompactMinFiles());
+ assertEquals(conf.getJournalCompactPercentage(), serverControl.getJournalCompactPercentage());
+ assertEquals(conf.isPersistenceEnabled(), serverControl.isPersistenceEnabled());
}
public void testGetConnectors() throws Exception
Modified: trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java 2009-09-17 14:08:00 UTC (rev 7967)
+++ trunk/tests/src/org/hornetq/tests/integration/management/HornetQServerControlUsingCoreTest.java 2009-09-17 22:02:17 UTC (rev 7968)
@@ -253,16 +253,6 @@
return (String)proxy.retrieveAttributeValue("pagingDirectory");
}
- public int getGlobalPageSize()
- {
- return (Integer)proxy.retrieveAttributeValue("globalPageSize");
- }
-
- public long getPagingMaxGlobalSizeBytes()
- {
- return (Long)proxy.retrieveAttributeValue("pagingMaxGlobalSizeBytes", Long.class);
- }
-
public long getQueueActivationTimeout()
{
return (Long)proxy.retrieveAttributeValue("queueActivationTimeout", Long.class);
@@ -423,6 +413,21 @@
return (Integer)proxy.retrieveAttributeValue("AIOBufferTimeout");
}
+ public int getJournalCompactMinFiles()
+ {
+ return (Integer)proxy.retrieveAttributeValue("JournalCompactMinFiles");
+ }
+
+ public int getJournalCompactPercentage()
+ {
+ return (Integer)proxy.retrieveAttributeValue("JournalCompactPercentage");
+ }
+
+ public boolean isPersistenceEnabled()
+ {
+ return (Boolean)proxy.retrieveAttributeValue("PersistenceEnabled");
+ }
+
};
}
// Package protected ---------------------------------------------
Modified: trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/QueueDeployerTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/QueueDeployerTest.java 2009-09-17 14:08:00 UTC (rev 7967)
+++ trunk/tests/src/org/hornetq/tests/unit/core/deployers/impl/QueueDeployerTest.java 2009-09-17 22:02:17 UTC (rev 7968)
@@ -339,18 +339,6 @@
return null;
}
- public int getGlobalPageSize()
- {
-
- return 0;
- }
-
- public long getPagingMaxGlobalSizeBytes()
- {
-
- return 0;
- }
-
public long getQueueActivationTimeout()
{
@@ -520,29 +508,29 @@
}
- /* (non-Javadoc)
- * @see org.hornetq.core.management.HornetQServerControlMBean#getAIOBufferSize()
- */
public int getAIOBufferSize()
{
return 0;
}
- /* (non-Javadoc)
- * @see org.hornetq.core.management.HornetQServerControlMBean#getAIOBufferTimeout()
- */
public int getAIOBufferTimeout()
{
return 0;
}
- /* (non-Javadoc)
- * @see org.hornetq.core.management.HornetQServerControlMBean#getAIOFlushOnSync()
- */
- public boolean isAIOFlushOnSync()
+ public int getJournalCompactMinFiles()
{
+ return 0;
+ }
+ public int getJournalCompactPercentage()
+ {
+ return 0;
+ }
+
+ public boolean isPersistenceEnabled()
+ {
return false;
}
16 years, 3 months
JBoss hornetq SVN: r7967 - branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-09-17 10:08:00 -0400 (Thu, 17 Sep 2009)
New Revision: 7967
Added:
branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/DelayInterceptor.java
Log:
added missing file
Added: branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/DelayInterceptor.java
===================================================================
--- branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/DelayInterceptor.java (rev 0)
+++ branches/Branch_Replication_Changes/tests/src/org/hornetq/tests/integration/cluster/failover/DelayInterceptor.java 2009-09-17 14:08:00 UTC (rev 7967)
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.cluster.failover;
+
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.remoting.Interceptor;
+import org.hornetq.core.remoting.Packet;
+import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
+
+/**
+ * A DelayInterceptor
+ *
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
+ *
+ *
+ */
+public class DelayInterceptor implements Interceptor
+{
+ public boolean intercept(Packet packet, RemotingConnection connection) throws HornetQException
+ {
+ if (packet.getType() == PacketImpl.SESS_SEND)
+ {
+ try
+ {
+ Thread.sleep(2000);
+ }
+ catch (Exception e)
+ {
+ }
+ }
+
+ return true;
+ }
+}
16 years, 3 months
JBoss hornetq SVN: r7966 - trunk/src/main/org/hornetq/ra/inflow.
by do-not-reply@jboss.org
Author: ataylor
Date: 2009-09-17 05:54:17 -0400 (Thu, 17 Sep 2009)
New Revision: 7966
Modified:
trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-134 - reverted to option B which is now followed exactly but left in local tx optimisation which is off by default
Modified: trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java
===================================================================
--- trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java 2009-09-17 08:54:21 UTC (rev 7965)
+++ trunk/src/main/org/hornetq/ra/inflow/HornetQMessageHandler.java 2009-09-17 09:54:17 UTC (rev 7966)
@@ -13,12 +13,15 @@
package org.hornetq.ra.inflow;
import java.util.UUID;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
import javax.jms.InvalidClientIDException;
import javax.jms.JMSException;
import javax.jms.MessageListener;
import javax.resource.spi.endpoint.MessageEndpoint;
import javax.resource.spi.endpoint.MessageEndpointFactory;
+import javax.resource.ResourceException;
import javax.transaction.Status;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
@@ -66,10 +69,7 @@
private final HornetQActivation activation;
- /**
- * The transaction demarcation strategy factory
- */
- private final DemarcationStrategyFactory strategyFactory = new DemarcationStrategyFactory();
+ private boolean useLocalTx;
public HornetQMessageHandler(final HornetQActivation activation, final ClientSession session)
{
@@ -103,7 +103,7 @@
SimpleString queueName = new SimpleString(HornetQTopic.createQueueNameForDurableSubscription(activation.getActivationSpec()
.getClientID(),
- subscriptionName));
+ subscriptionName));
SessionQueueQueryResponseMessage subResponse = session.queueQuery(queueName);
@@ -160,6 +160,7 @@
// Create the endpoint, if we are transacted pass the sesion so it is enlisted, unless using Local TX
MessageEndpointFactory endpointFactory = activation.getMessageEndpointFactory();
+ useLocalTx = !activation.isDeliveryTransacted() && activation.getActivationSpec().isUseLocalTx();
if (activation.isDeliveryTransacted() && !activation.getActivationSpec().isUseLocalTx())
{
endpoint = endpointFactory.createEndpoint(session);
@@ -213,321 +214,49 @@
log.trace("onMessage(" + message + ")");
}
- TransactionDemarcationStrategy txnStrategy = strategyFactory.getStrategy();
- try
- {
- txnStrategy.start();
- }
- catch (Throwable throwable)
- {
- log.warn("Unable to create transaction: " + throwable.getMessage());
- txnStrategy = new NoTXTransactionDemarcationStrategy();
- }
-
HornetQMessage msg = HornetQMessage.createMessage(message, session);
-
+ boolean beforeDelivery = false;
try
{
+ endpoint.beforeDelivery(HornetQActivation.ONMESSAGE);
+ beforeDelivery = true;
msg.doBeforeReceive();
message.acknowledge();
- }
- catch (Exception e)
- {
- log.error("Failed to prepare message for receipt", e);
-
- return;
- }
-
- try
- {
((MessageListener) endpoint).onMessage(msg);
- }
- catch (Throwable t)
- {
- log.error("Unexpected error delivering message " + message, t);
- txnStrategy.error();
- }
- finally
- {
- txnStrategy.end();
- }
- }
-
- /**
- * Demarcation strategy factory
- */
- private class DemarcationStrategyFactory
- {
- /**
- * Get the transaction demarcation strategy
- *
- * @return The strategy
- */
- TransactionDemarcationStrategy getStrategy()
- {
- if (trace)
+ endpoint.afterDelivery();
+ if(useLocalTx)
{
- log.trace("getStrategy()");
+ session.commit();
}
-
- if (activation.isDeliveryTransacted())
- {
- if (!activation.getActivationSpec().isUseLocalTx())
- {
- try
- {
- return new XATransactionDemarcationStrategy();
- }
- catch (Throwable t)
- {
- log.error(this + " error creating transaction demarcation ", t);
- }
- }
- else
- {
- return new LocalDemarcationStrategy();
- }
-
- }
- else
- {
- if (!activation.getActivationSpec().isUseLocalTx())
- {
- return new NoTXTransactionDemarcationStrategy();
- }
- else
- {
- return new LocalDemarcationStrategy();
- }
- }
-
- return null;
}
- }
-
- /**
- * Transaction demarcation strategy
- */
- private interface TransactionDemarcationStrategy
- {
- /*
- * Start
- */
- void start() throws Throwable;
-
- /**
- * Error
- */
- void error();
-
- /**
- * End
- */
- void end();
- }
-
- /**
- * Local demarcation strategy
- */
- private class LocalDemarcationStrategy implements TransactionDemarcationStrategy
- {
- private boolean rolledBack = false;
- /*
- * Start
- */
-
- public void start()
+ catch (Throwable e)
{
- }
-
- /**
- * Error
- */
- public void error()
- {
- if (trace)
+ log.error("Failed to deliver message", e);
+ //we need to call before/afterDelivery as a pair
+ if(beforeDelivery)
{
- log.trace("error()");
- }
-
- if (session != null)
- {
try
{
- session.rollback();
- rolledBack = true;
+ endpoint.afterDelivery();
}
- catch (HornetQException e)
+ catch (ResourceException e1)
{
- log.error("Failed to rollback session transaction", e);
+ log.warn("Unable to call after delivery");
}
}
- }
-
- /**
- * End
- */
- public void end()
- {
- if (trace)
+ if(useLocalTx)
{
- log.trace("end()");
- }
-
- if (!rolledBack)
- {
- if (session != null)
- {
- try
- {
- session.commit();
- }
- catch (HornetQException e)
- {
- log.error("Failed to commit session transaction", e);
- }
- }
- }
- }
- }
-
- /**
- * XA demarcation strategy
- */
- private class XATransactionDemarcationStrategy implements TransactionDemarcationStrategy
- {
- private final TransactionManager tm = activation.getTransactionManager();
-
- private Transaction trans;
-
- public void start() throws Throwable
- {
- final int timeout = activation.getActivationSpec().getTransactionTimeout();
-
- if (timeout > 0)
- {
- if (trace)
- {
- log.trace("Setting transactionTimeout for JMSSessionPool to " + timeout);
- }
-
- tm.setTransactionTimeout(timeout);
- }
-
- tm.begin();
-
- try
- {
- trans = tm.getTransaction();
-
- if (trace)
- {
- log.trace(this + " using tx=" + trans);
- }
-
- if (!trans.enlistResource(session))
- {
- throw new JMSException("could not enlist resource");
- }
- if (trace)
- {
- log.trace(this + " XAResource '" + session + " enlisted.");
- }
-
- }
- catch (Throwable t)
- {
try
{
- tm.rollback();
+ session.rollback();
}
- catch (Throwable ignored)
+ catch (HornetQException e1)
{
- log.trace(this + " ignored error rolling back after failed enlist", ignored);
+ log.warn("Unable to roll local transaction back");
}
- throw t;
}
}
- public void error()
- {
- // Mark for tollback TX via TM
- try
- {
- if (trace)
- {
- log.trace(this + " using TM to mark TX for rollback tx=" + trans);
- }
-
- trans.setRollbackOnly();
- }
- catch (Throwable t)
- {
- log.error(this + " failed to set rollback only", t);
- }
- }
-
- public void end()
- {
- try
- {
- // Use the TM to commit the Tx (assert the correct association)
- Transaction currentTx = tm.getTransaction();
- if (!trans.equals(currentTx))
- {
- throw new IllegalStateException("Wrong tx association: expected " + trans + " was " + currentTx);
- }
-
- // Marked rollback
- if (trans.getStatus() == Status.STATUS_MARKED_ROLLBACK)
- {
- if (trace)
- {
- log.trace(this + " rolling back JMS transaction tx=" + trans);
- }
-
- // Actually roll it back
- tm.rollback();
-
- }
- else if (trans.getStatus() == Status.STATUS_ACTIVE)
- {
- // Commit tx
- // This will happen if
- // a) everything goes well
- // b) app. exception was thrown
- if (trace)
- {
- log.trace(this + " commiting the JMS transaction tx=" + trans);
- }
-
- tm.commit();
-
- }
- else
- {
- tm.suspend();
- }
- }
- catch (Throwable t)
- {
- log.error(this + " failed to commit/rollback", t);
- }
- }
}
- private class NoTXTransactionDemarcationStrategy implements TransactionDemarcationStrategy
- {
- public void start() throws Throwable
- {
- }
-
- public void error()
- {
- }
-
- public void end()
- {
- }
- }
}
16 years, 3 months