Author: clebert.suconic(a)jboss.com
Date: 2011-08-11 12:56:03 -0400 (Thu, 11 Aug 2011)
New Revision: 11186
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/remoting/DirectDeliverTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java
Log:
Dealing with a dead lock that happened on the testsuite
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2011-08-10
17:30:15 UTC (rev 11185)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/management/impl/QueueControlImpl.java 2011-08-11
16:56:03 UTC (rev 11186)
@@ -400,7 +400,7 @@
{
Filter filter = FilterImpl.createFilter(filterStr);
List<Map<String, Object>> messages = new ArrayList<Map<String,
Object>>();
- queue.blockOnExecutorFuture();
+ queue.flushExecutor();
LinkedListIterator<MessageReference> iterator = queue.iterator();
try
{
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java 2011-08-10
17:30:15 UTC (rev 11185)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java 2011-08-11
16:56:03 UTC (rev 11186)
@@ -167,7 +167,7 @@
void resetAllIterators();
- boolean blockOnExecutorFuture();
+ boolean flushExecutor();
void close() throws Exception;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-08-10
17:30:15 UTC (rev 11185)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-08-11
16:56:03 UTC (rev 11186)
@@ -383,7 +383,7 @@
{
// We must block on the executor to ensure any async deliveries have
completed or we might get out of order
// deliveries
- if (blockOnExecutorFuture())
+ if (flushExecutor())
{
// Go into direct delivery mode
directDeliver = true;
@@ -443,7 +443,20 @@
checkQueueSizeFuture.cancel(false);
}
- cancelRedistributor();
+ getExecutor().execute(new Runnable(){
+ public void run()
+ {
+ try
+ {
+ cancelRedistributor();
+ }
+ catch (Exception e)
+ {
+ // nothing that could be done anyway.. just logging
+ log.warn(e.getMessage(), e);
+ }
+ }
+ });
}
public Executor getExecutor()
@@ -464,14 +477,14 @@
{
deliverAsync();
- blockOnExecutorFuture();
+ flushExecutor();
}
- public boolean blockOnExecutorFuture()
+ public boolean flushExecutor()
{
Future future = new Future();
- executor.execute(future);
+ getExecutor().execute(future);
boolean ok = future.await(10000);
@@ -1137,28 +1150,43 @@
}
}
- public synchronized void expireReferences() throws Exception
+ public void expireReferences() throws Exception
{
- LinkedListIterator<MessageReference> iter = iterator();
-
- try
- {
- while (iter.hasNext())
+ getExecutor().execute(new Runnable(){
+ public void run()
{
- MessageReference ref = iter.next();
- if (ref.getMessage().isExpired())
+ synchronized (QueueImpl.this)
{
- deliveringCount.incrementAndGet();
- expire(ref);
- iter.remove();
- refRemoved(ref);
+ LinkedListIterator<MessageReference> iter = iterator();
+
+ try
+ {
+ while (iter.hasNext())
+ {
+ MessageReference ref = iter.next();
+ try
+ {
+ if (ref.getMessage().isExpired())
+ {
+ deliveringCount.incrementAndGet();
+ expire(ref);
+ iter.remove();
+ refRemoved(ref);
+ }
+ }
+ catch (Exception e)
+ {
+ log.warn("Error expiring reference " + ref, e);
+ }
+ }
+ }
+ finally
+ {
+ iter.close();
+ }
}
}
- }
- finally
- {
- iter.close();
- }
+ });
}
public synchronized boolean sendMessageToDeadLetterAddress(final long messageID)
throws Exception
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/remoting/DirectDeliverTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/remoting/DirectDeliverTest.java 2011-08-10
17:30:15 UTC (rev 11185)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/remoting/DirectDeliverTest.java 2011-08-11
16:56:03 UTC (rev 11186)
@@ -134,7 +134,7 @@
prod.send(msg);
}
- queue.blockOnExecutorFuture();
+ queue.flushExecutor();
//Consumer is not started so should go queued
assertFalse(queue.isDirectDeliver());
@@ -157,7 +157,7 @@
prod.send(msg);
- queue.blockOnExecutorFuture();
+ queue.flushExecutor();
assertTrue(queue.isDirectDeliver());
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2011-08-10
17:30:15 UTC (rev 11185)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2011-08-11
16:56:03 UTC (rev 11186)
@@ -82,7 +82,7 @@
}
- public boolean blockOnExecutorFuture()
+ public boolean flushExecutor()
{
return true;
}
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java 2011-08-10
17:30:15 UTC (rev 11185)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java 2011-08-11
16:56:03 UTC (rev 11186)
@@ -617,7 +617,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.server.Queue#blockOnExecutorFuture()
*/
- public boolean blockOnExecutorFuture()
+ public boolean flushExecutor()
{
// TODO Auto-generated method stub
return false;
Show replies by date