Author: clebert.suconic(a)jboss.com
Date: 2009-11-19 14:53:49 -0500 (Thu, 19 Nov 2009)
New Revision: 8325
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/ClebertTemporary/src/main/org/hornetq/core/paging/PagingStore.java
branches/ClebertTemporary/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java
branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/ClebertTemporary/src/main/org/hornetq/core/server/LargeServerMessage.java
branches/ClebertTemporary/src/main/org/hornetq/core/server/Queue.java
branches/ClebertTemporary/src/main/org/hornetq/core/server/ServerMessage.java
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/LastValueQueue.java
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/ClebertTemporary/src/main/org/hornetq/core/transaction/TransactionOperation.java
branches/ClebertTemporary/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java
branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/xa/XaTimeoutTest.java
branches/ClebertTemporary/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
Log:
Removing recursive IO calls on the callbacks
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-11-19
17:50:33 UTC (rev 8324)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-11-19
19:53:49 UTC (rev 8325)
@@ -1024,6 +1024,7 @@
}
catch (HornetQException e)
{
+ log.warn(e.getMessage(), e);
// This should never occur
throw new XAException(XAException.XAER_RMERR);
}
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/paging/PagingStore.java 2009-11-19
17:50:33 UTC (rev 8324)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/paging/PagingStore.java 2009-11-19
19:53:49 UTC (rev 8325)
@@ -67,14 +67,14 @@
* @param message
* @throws Exception
*/
- void addSize(ServerMessage message, boolean add) throws Exception;
+ void addSize(ServerMessage message, boolean add);
/**
*
* @param reference
* @throws Exception
*/
- void addSize(MessageReference reference, boolean add) throws Exception;
+ void addSize(MessageReference reference, boolean add);
/**
*
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2009-11-19
17:50:33 UTC (rev 8324)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2009-11-19
19:53:49 UTC (rev 8325)
@@ -281,7 +281,7 @@
checkReleaseProducerFlowControlCredits(-credits);
}
- public void addSize(final ServerMessage message, final boolean add) throws Exception
+ public void addSize(final ServerMessage message, final boolean add)
{
long size = message.getMemoryEstimate();
@@ -299,7 +299,7 @@
}
}
- public void addSize(final MessageReference reference, final boolean add) throws
Exception
+ public void addSize(final MessageReference reference, final boolean add)
{
long size = MessageReferenceImpl.getMemoryEstimate();
@@ -479,7 +479,7 @@
}
}
- public boolean startPaging() throws Exception
+ public boolean startPaging()
{
if (!running)
{
@@ -510,7 +510,17 @@
{
if (currentPage == null)
{
- openNewPage();
+ try
+ {
+ openNewPage();
+ }
+ catch (Exception e)
+ {
+ // If not possible to starting page due to an IO error, we will just
consider it non paging.
+ // This shouldn't happen anyway
+ log.warn("IO Error, impossible to start paging", e);
+ return false;
+ }
return true;
}
@@ -701,7 +711,7 @@
}
}
- private void addSize(final long size) throws Exception
+ private void addSize(final long size)
{
if (addressFullMessagePolicy != AddressFullMessagePolicy.PAGE)
{
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java 2009-11-19
17:50:33 UTC (rev 8324)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java 2009-11-19
19:53:49 UTC (rev 8325)
@@ -176,7 +176,7 @@
this.delayDeletionCount.incrementAndGet();
}
- public synchronized void decrementDelayDeletionCount() throws Exception
+ public synchronized void decrementDelayDeletionCount()
{
int count = this.delayDeletionCount.decrementAndGet();
@@ -191,7 +191,7 @@
return new DecodingContext();
}
- private void checkDelete() throws Exception
+ private void checkDelete()
{
if (getRefCount() <= 0)
{
@@ -220,7 +220,7 @@
}
@Override
- public synchronized int decrementRefCount(MessageReference reference) throws
Exception
+ public synchronized int decrementRefCount(MessageReference reference)
{
int currentRefCount = super.decrementRefCount(reference);
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-19
17:50:33 UTC (rev 8324)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-19
19:53:49 UTC (rev 8325)
@@ -1865,11 +1865,11 @@
}
}
- public void afterPrepare(final Transaction tx) throws Exception
+ public void afterPrepare(final Transaction tx)
{
}
- public void afterRollback(final Transaction tx) throws Exception
+ public void afterRollback(final Transaction tx)
{
PageTransactionInfo pageTransaction =
(PageTransactionInfo)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2009-11-19
17:50:33 UTC (rev 8324)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageLargeServerMessage.java 2009-11-19
19:53:49 UTC (rev 8325)
@@ -138,7 +138,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.server.LargeServerMessage#decrementDelayDeletionCount()
*/
- public void decrementDelayDeletionCount() throws Exception
+ public void decrementDelayDeletionCount()
{
}
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java 2009-11-19
17:50:33 UTC (rev 8324)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java 2009-11-19
19:53:49 UTC (rev 8325)
@@ -211,7 +211,7 @@
this.recordID = recordID;
}
- private void process() throws Exception
+ private void process()
{
if (!done)
{
@@ -246,17 +246,17 @@
{
}
- public void afterCommit(final Transaction tx) throws Exception
+ public void afterCommit(final Transaction tx)
{
process();
}
- public void afterPrepare(final Transaction tx) throws Exception
+ public void afterPrepare(final Transaction tx)
{
process();
}
- public void afterRollback(final Transaction tx) throws Exception
+ public void afterRollback(final Transaction tx)
{
}
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-19
17:50:33 UTC (rev 8324)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-19
19:53:49 UTC (rev 8325)
@@ -1114,11 +1114,11 @@
}
}
- public void afterPrepare(final Transaction tx) throws Exception
+ public void afterPrepare(final Transaction tx)
{
}
- public void afterRollback(final Transaction tx) throws Exception
+ public void afterRollback(final Transaction tx)
{
PageTransactionInfo pageTransaction =
(PageTransactionInfo)tx.getProperty(TransactionPropertyIndexes.PAGE_TRANSACTION);
@@ -1225,11 +1225,11 @@
}
}
- public void afterPrepare(Transaction tx) throws Exception
+ public void afterPrepare(Transaction tx)
{
}
- public void afterRollback(Transaction tx) throws Exception
+ public void afterRollback(Transaction tx)
{
}
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/server/LargeServerMessage.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/server/LargeServerMessage.java 2009-11-19
17:50:33 UTC (rev 8324)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/server/LargeServerMessage.java 2009-11-19
19:53:49 UTC (rev 8325)
@@ -42,5 +42,5 @@
void incrementDelayDeletionCount();
- void decrementDelayDeletionCount() throws Exception;
+ void decrementDelayDeletionCount();
}
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- branches/ClebertTemporary/src/main/org/hornetq/core/server/Queue.java 2009-11-19
17:50:33 UTC (rev 8324)
+++ branches/ClebertTemporary/src/main/org/hornetq/core/server/Queue.java 2009-11-19
19:53:49 UTC (rev 8325)
@@ -125,6 +125,10 @@
Collection<Consumer> getConsumers();
+ /** We can't execute IO operation when inside the IOCallback /
TransactionCallback.
+ * This method will will perform IO operations in a second thread */
+ boolean checkDLQ(MessageReference ref, Executor ioExecutor) throws Exception;
+
boolean checkDLQ(MessageReference ref) throws Exception;
void lockDelivery();
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/server/ServerMessage.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/server/ServerMessage.java 2009-11-19
17:50:33 UTC (rev 8324)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/server/ServerMessage.java 2009-11-19
19:53:49 UTC (rev 8325)
@@ -33,7 +33,7 @@
int incrementRefCount(MessageReference reference) throws Exception;
- int decrementRefCount(MessageReference reference) throws Exception;
+ int decrementRefCount(MessageReference reference);
int incrementDurableRefCount();
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-11-19
17:50:33 UTC (rev 8324)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-11-19
19:53:49 UTC (rev 8325)
@@ -1025,7 +1025,7 @@
configuration.getManagementClusterPassword(),
managementService);
- queueFactory = new QueueFactoryImpl(scheduledPool, addressSettingsRepository,
storageManager);
+ queueFactory = new QueueFactoryImpl(executorFactory, scheduledPool,
addressSettingsRepository, storageManager);
pagingManager = createPagingManager();
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/LastValueQueue.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2009-11-19
17:50:33 UTC (rev 8324)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2009-11-19
19:53:49 UTC (rev 8325)
@@ -14,6 +14,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.hornetq.core.filter.Filter;
@@ -50,6 +51,7 @@
final Filter filter,
final boolean durable,
final boolean temporary,
+ final Executor executor,
final ScheduledExecutorService scheduledExecutor,
final PostOffice postOffice,
final StorageManager storageManager,
@@ -61,6 +63,7 @@
filter,
durable,
temporary,
+ executor,
scheduledExecutor,
postOffice,
storageManager,
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java 2009-11-19
17:50:33 UTC (rev 8324)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java 2009-11-19
19:53:49 UTC (rev 8325)
@@ -22,6 +22,7 @@
import org.hornetq.core.server.QueueFactory;
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
+import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.SimpleString;
/**
@@ -42,11 +43,16 @@
private PostOffice postOffice;
private final StorageManager storageManager;
+
+ private final ExecutorFactory executorFactory;
- public QueueFactoryImpl(final ScheduledExecutorService scheduledExecutor,
+ public QueueFactoryImpl(final ExecutorFactory executorFactory,
+ final ScheduledExecutorService scheduledExecutor,
final HierarchicalRepository<AddressSettings>
addressSettingsRepository,
final StorageManager storageManager)
{
+ this.executorFactory = executorFactory;
+
this.addressSettingsRepository = addressSettingsRepository;
this.scheduledExecutor = scheduledExecutor;
@@ -77,6 +83,7 @@
filter,
durable,
temporary,
+ executorFactory.getExecutor(),
scheduledExecutor,
postOffice,
storageManager,
@@ -90,6 +97,7 @@
filter,
durable,
temporary,
+ executorFactory.getExecutor(),
scheduledExecutor,
postOffice,
storageManager,
Modified: branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-11-19
17:50:33 UTC (rev 8324)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-11-19
19:53:49 UTC (rev 8325)
@@ -114,6 +114,9 @@
private final HierarchicalRepository<AddressSettings>
addressSettingsRepository;
private final ScheduledExecutorService scheduledExecutor;
+
+ /** We can't perform any operation on the journal while inside the Transactional
operations. */
+ private final Executor journalExecutor;
private final SimpleString address;
@@ -139,6 +142,7 @@
final Filter filter,
final boolean durable,
final boolean temporary,
+ final Executor executor,
final ScheduledExecutorService scheduledExecutor,
final PostOffice postOffice,
final StorageManager storageManager,
@@ -163,6 +167,8 @@
this.addressSettingsRepository = addressSettingsRepository;
this.scheduledExecutor = scheduledExecutor;
+
+ this.journalExecutor = executor;
direct = true;
@@ -921,11 +927,36 @@
public boolean checkDLQ(final MessageReference reference) throws Exception
{
+ return checkDLQ(reference, null);
+ }
+
+ public boolean checkDLQ(final MessageReference reference, Executor ioExecutor) throws
Exception
+ {
ServerMessage message = reference.getMessage();
if (message.isDurable() && durable)
{
- storageManager.updateDeliveryCount(reference);
+ if (ioExecutor != null)
+ {
+ ioExecutor.execute(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ storageManager.updateDeliveryCount(reference);
+ }
+ catch (Exception e)
+ {
+ log.warn("Can't update delivery count on checkDLQ",
e);
+ }
+ }
+ });
+ }
+ else
+ {
+ storageManager.updateDeliveryCount(reference);
+ }
}
AddressSettings addressSettings =
addressSettingsRepository.getMatch(address.toString());
@@ -934,7 +965,27 @@
if (maxDeliveries > 0 && reference.getDeliveryCount() >=
maxDeliveries)
{
- sendToDeadLetterAddress(reference);
+ if (ioExecutor != null)
+ {
+ ioExecutor.execute(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ sendToDeadLetterAddress(reference);
+ }
+ catch (Exception e)
+ {
+ log.warn("Error on DLQ send", e);
+ }
+ }
+ });
+ }
+ else
+ {
+ sendToDeadLetterAddress(reference);
+ }
return false;
}
@@ -946,7 +997,27 @@
{
reference.setScheduledDeliveryTime(System.currentTimeMillis() +
redeliveryDelay);
- storageManager.updateScheduledDeliveryTime(reference);
+ if (ioExecutor != null)
+ {
+ ioExecutor.execute(new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ sendToDeadLetterAddress(reference);
+ }
+ catch (Exception e)
+ {
+ log.warn("Error on DLQ send", e);
+ }
+ }
+ });
+ }
+ else
+ {
+ storageManager.updateScheduledDeliveryTime(reference);
+ }
}
deliveringCount.decrementAndGet();
@@ -1377,7 +1448,7 @@
return status;
}
- private void removeExpiringReference(final MessageReference ref) throws Exception
+ private void removeExpiringReference(final MessageReference ref)
{
if (ref.getMessage().getExpiration() > 0)
{
@@ -1385,7 +1456,7 @@
}
}
- private void postAcknowledge(final MessageReference ref) throws Exception
+ private void postAcknowledge(final MessageReference ref)
{
ServerMessage message = ref.getMessage();
@@ -1427,7 +1498,7 @@
message.decrementRefCount(ref);
}
- void postRollback(final LinkedList<MessageReference> refs) throws Exception
+ void postRollback(final LinkedList<MessageReference> refs)
{
synchronized (this)
{
@@ -1477,29 +1548,38 @@
{
}
- public void afterPrepare(final Transaction tx) throws Exception
+ public void afterPrepare(final Transaction tx)
{
}
- public void afterRollback(final Transaction tx) throws Exception
+ public void afterRollback(final Transaction tx)
{
Map<QueueImpl, LinkedList<MessageReference>> queueMap = new
HashMap<QueueImpl, LinkedList<MessageReference>>();
for (MessageReference ref : refsToAck)
{
- if (ref.getQueue().checkDLQ(ref))
+ try
{
- LinkedList<MessageReference> toCancel =
queueMap.get(ref.getQueue());
-
- if (toCancel == null)
+ if (ref.getQueue().checkDLQ(ref, journalExecutor))
{
- toCancel = new LinkedList<MessageReference>();
-
- queueMap.put((QueueImpl)ref.getQueue(), toCancel);
+ LinkedList<MessageReference> toCancel =
queueMap.get(ref.getQueue());
+
+ if (toCancel == null)
+ {
+ toCancel = new LinkedList<MessageReference>();
+
+ queueMap.put((QueueImpl)ref.getQueue(), toCancel);
+ }
+
+ toCancel.addFirst(ref);
}
-
- toCancel.addFirst(ref);
}
+ catch (Exception e)
+ {
+ // checkDLQ here will be using an executor, this shouldn't happen
+ // don't you just hate checked exceptions in java?
+ log.warn("Error on checkDLQ", e);
+ }
}
for (Map.Entry<QueueImpl, LinkedList<MessageReference>> entry :
queueMap.entrySet())
@@ -1515,7 +1595,7 @@
}
}
- public void afterCommit(final Transaction tx) throws Exception
+ public void afterCommit(final Transaction tx)
{
for (MessageReference ref : refsToAck)
{
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-11-19
17:50:33 UTC (rev 8324)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-11-19
19:53:49 UTC (rev 8325)
@@ -119,7 +119,7 @@
return count;
}
- public int decrementRefCount(final MessageReference reference) throws Exception
+ public int decrementRefCount(final MessageReference reference)
{
int count = refCount.decrementAndGet();
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-19
17:50:33 UTC (rev 8324)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-19
19:53:49 UTC (rev 8325)
@@ -766,6 +766,7 @@
public void handleRollback(final RollbackMessage packet)
{
+ new Exception("Rollback").printStackTrace();
Packet response = null;
try
@@ -1078,6 +1079,7 @@
public void handleXARollback(final SessionXARollbackMessage packet)
{
+ System.out.println("XARollback");
Packet response = null;
Xid xid = packet.getXid();
Modified:
branches/ClebertTemporary/src/main/org/hornetq/core/transaction/TransactionOperation.java
===================================================================
---
branches/ClebertTemporary/src/main/org/hornetq/core/transaction/TransactionOperation.java 2009-11-19
17:50:33 UTC (rev 8324)
+++
branches/ClebertTemporary/src/main/org/hornetq/core/transaction/TransactionOperation.java 2009-11-19
19:53:49 UTC (rev 8325)
@@ -29,9 +29,9 @@
void beforeRollback(Transaction tx) throws Exception;
- void afterPrepare(Transaction tx) throws Exception;
+ void afterPrepare(Transaction tx);
- void afterCommit(Transaction tx) throws Exception;
+ void afterCommit(Transaction tx);
- void afterRollback(Transaction tx) throws Exception;
+ void afterRollback(Transaction tx);
}
Modified:
branches/ClebertTemporary/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java
===================================================================
---
branches/ClebertTemporary/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java 2009-11-19
17:50:33 UTC (rev 8324)
+++
branches/ClebertTemporary/tests/src/org/hornetq/tests/concurrent/server/impl/QueueTest.java 2009-11-19
19:53:49 UTC (rev 8325)
@@ -40,8 +40,20 @@
{
private static final Logger log = Logger.getLogger(QueueTest.class);
- private QueueFactory queueFactory = new FakeQueueFactory();
+ private FakeQueueFactory queueFactory = new FakeQueueFactory();
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ queueFactory = new FakeQueueFactory();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ queueFactory.stop();
+ super.tearDown();
+ }
+
/*
* Concurrent set consumer not busy, busy then, call deliver while messages are being
added and consumed
*/
Modified:
branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/xa/XaTimeoutTest.java
===================================================================
---
branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/xa/XaTimeoutTest.java 2009-11-19
17:50:33 UTC (rev 8324)
+++
branches/ClebertTemporary/tests/src/org/hornetq/tests/integration/xa/XaTimeoutTest.java 2009-11-19
19:53:49 UTC (rev 8325)
@@ -538,15 +538,15 @@
{
}
- public void afterPrepare(Transaction tx) throws Exception
+ public void afterPrepare(Transaction tx)
{
}
- public void afterCommit(Transaction tx) throws Exception
+ public void afterCommit(Transaction tx)
{
}
- public void afterRollback(Transaction tx) throws Exception
+ public void afterRollback(Transaction tx)
{
latch.countDown();
}
Modified:
branches/ClebertTemporary/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java
===================================================================
---
branches/ClebertTemporary/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java 2009-11-19
17:50:33 UTC (rev 8324)
+++
branches/ClebertTemporary/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java 2009-11-19
19:53:49 UTC (rev 8325)
@@ -15,6 +15,8 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -41,17 +43,23 @@
private static final Logger log = Logger.getLogger(QueueImplTest.class);
private ScheduledExecutorService scheduledExecutor;
+
+ private ExecutorService executor;
public void setUp() throws Exception
{
super.setUp();
scheduledExecutor = new ScheduledThreadPoolExecutor(1);
+
+ executor = Executors.newSingleThreadExecutor();
}
public void tearDown() throws Exception
{
scheduledExecutor.shutdownNow();
+
+ executor.shutdown();
super.tearDown();
}
@@ -70,7 +78,7 @@
public void testScheduledNoConsumer() throws Exception
{
- Queue queue = new QueueImpl(1, new SimpleString("address1"), new
SimpleString("queue1"), null, false, true, scheduledExecutor, null, null,
null);
+ Queue queue = new QueueImpl(1, new SimpleString("address1"), new
SimpleString("queue1"), null, false, true, executor, scheduledExecutor, null,
null, null);
//Send one scheduled
@@ -136,7 +144,7 @@
private void testScheduled(boolean direct) throws Exception
{
- Queue queue = new QueueImpl(1,new SimpleString("address1"), new
SimpleString("queue1"), null, false, true, scheduledExecutor, null, null,
null);
+ Queue queue = new QueueImpl(1,new SimpleString("address1"), new
SimpleString("queue1"), null, false, true, executor, scheduledExecutor, null,
null, null);
FakeConsumer consumer = null;
@@ -243,7 +251,7 @@
return HandleStatus.HANDLED;
}
};
- Queue queue = new QueueImpl(1, new SimpleString("address1"), queue1,
null, false, true, scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, new SimpleString("address1"), queue1,
null, false, true, executor, scheduledExecutor, null, null, null);
MessageReference messageReference = generateReference(queue, 1);
queue.addConsumer(consumer);
messageReference.setScheduledDeliveryTime(System.currentTimeMillis() + 2000);
Modified:
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
---
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2009-11-19
17:50:33 UTC (rev 8324)
+++
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2009-11-19
19:53:49 UTC (rev 8325)
@@ -1131,7 +1131,7 @@
return null;
}
- public int decrementRefCount(MessageReference reference) throws Exception
+ public int decrementRefCount(MessageReference reference)
{
// TODO Auto-generated method stub
return 0;
Modified:
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
---
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2009-11-19
17:50:33 UTC (rev 8324)
+++
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2009-11-19
19:53:49 UTC (rev 8325)
@@ -493,4 +493,13 @@
return false;
}
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.server.Queue#checkDLQ(org.hornetq.core.server.MessageReference,
java.util.concurrent.Executor)
+ */
+ public boolean checkDLQ(MessageReference ref, Executor ioExecutor) throws Exception
+ {
+ // TODO Auto-generated method stub
+ return false;
+ }
+
}
\ No newline at end of file
Modified:
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
---
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java 2009-11-19
17:50:33 UTC (rev 8324)
+++
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java 2009-11-19
19:53:49 UTC (rev 8325)
@@ -17,6 +17,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -43,7 +44,23 @@
{
// The tests ----------------------------------------------------------------
- private final ScheduledExecutorService scheduledExecutor =
Executors.newSingleThreadScheduledExecutor();
+ private ScheduledExecutorService scheduledExecutor;
+
+ private ExecutorService executor;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
+ executor = Executors.newSingleThreadExecutor();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ scheduledExecutor.shutdown();
+ executor.shutdown();
+ super.tearDown();
+ }
private static final SimpleString queue1 = new SimpleString("queue1");
@@ -53,18 +70,18 @@
{
final SimpleString name = new SimpleString("oobblle");
- Queue queue = new QueueImpl(1, address1, name, null, false, true,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, name, null, false, true, executor,
scheduledExecutor, null, null, null);
assertEquals(name, queue.getName());
}
public void testDurable()
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, false,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, false, executor,
scheduledExecutor, null, null, null);
assertFalse(queue.isDurable());
- queue = new QueueImpl(1, address1, queue1, null, true, false, scheduledExecutor,
null, null, null);
+ queue = new QueueImpl(1, address1, queue1, null, true, false, executor,
scheduledExecutor, null, null, null);
assertTrue(queue.isDurable());
}
@@ -77,7 +94,7 @@
Consumer cons3 = new FakeConsumer();
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
assertEquals(0, queue.getConsumerCount());
@@ -118,7 +135,7 @@
public void testGetFilter()
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
assertNull(queue.getFilter());
@@ -135,7 +152,7 @@
}
};
- queue = new QueueImpl(1, address1, queue1, filter, false, true, scheduledExecutor,
null, null, null);
+ queue = new QueueImpl(1, address1, queue1, filter, false, true, executor,
scheduledExecutor, null, null, null);
assertEquals(filter, queue.getFilter());
@@ -143,7 +160,7 @@
public void testSimpleadd()
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
final int numMessages = 10;
@@ -162,7 +179,7 @@
public void testSimpleDirectDelivery() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
FakeConsumer consumer = new FakeConsumer();
@@ -190,7 +207,7 @@
public void testSimpleNonDirectDelivery() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
final int numMessages = 10;
@@ -228,7 +245,7 @@
public void testBusyConsumer() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
FakeConsumer consumer = new FakeConsumer();
@@ -272,7 +289,7 @@
public void testBusyConsumerThenAddMoreMessages() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
FakeConsumer consumer = new FakeConsumer();
@@ -339,7 +356,7 @@
public void testAddFirstadd() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
final int numMessages = 10;
@@ -399,7 +416,7 @@
null,
false,
true,
- scheduledExecutor,
+ executor, scheduledExecutor,
new FakePostOffice(),
null,
null);
@@ -556,7 +573,7 @@
public void testConsumerReturningNull() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
class NullConsumer implements Consumer
{
@@ -589,7 +606,7 @@
public void testRoundRobinWithQueueing() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
final int numMessages = 10;
@@ -632,7 +649,7 @@
public void testRoundRobinDirect() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
final int numMessages = 10;
@@ -673,7 +690,7 @@
public void testWithPriorities() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
final int numMessages = 10;
@@ -740,7 +757,7 @@
public void testConsumerWithFilterAddAndRemove()
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
Filter filter = new FakeFilter("fruit", "orange");
@@ -749,7 +766,7 @@
public void testList()
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
final int numMessages = 20;
@@ -773,7 +790,7 @@
public void testListWithFilter()
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
final int numMessages = 20;
@@ -815,7 +832,7 @@
null,
false,
true,
- scheduledExecutor,
+ executor, scheduledExecutor,
new FakePostOffice(),
null,
null);
@@ -887,7 +904,7 @@
public void testBusyConsumerWithFilterFirstCallBusy() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
FakeConsumer consumer = new FakeConsumer(FilterImpl.createFilter("color =
'green'"));
@@ -928,7 +945,7 @@
public void testBusyConsumerWithFilterThenAddMoreMessages() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
FakeConsumer consumer = new FakeConsumer(FilterImpl.createFilter("color =
'green'"));
@@ -1002,7 +1019,7 @@
public void testConsumerWithFilterThenAddMoreMessages() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
final int numMessages = 10;
List<MessageReference> refs = new ArrayList<MessageReference>();
@@ -1072,7 +1089,7 @@
null,
false,
true,
- scheduledExecutor,
+ executor, scheduledExecutor,
new FakePostOffice(),
null,
null);
@@ -1164,7 +1181,7 @@
public void testMessageOrder() throws Exception
{
FakeConsumer consumer = new FakeConsumer();
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
MessageReference messageReference = generateReference(queue, 1);
MessageReference messageReference2 = generateReference(queue, 2);
MessageReference messageReference3 = generateReference(queue, 3);
@@ -1184,7 +1201,7 @@
public void testMessagesAdded() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
MessageReference messageReference = generateReference(queue, 1);
MessageReference messageReference2 = generateReference(queue, 2);
MessageReference messageReference3 = generateReference(queue, 3);
@@ -1196,7 +1213,7 @@
public void testGetReference() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
MessageReference messageReference = generateReference(queue, 1);
MessageReference messageReference2 = generateReference(queue, 2);
MessageReference messageReference3 = generateReference(queue, 3);
@@ -1209,7 +1226,7 @@
public void testGetNonExistentReference() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
MessageReference messageReference = generateReference(queue, 1);
MessageReference messageReference2 = generateReference(queue, 2);
MessageReference messageReference3 = generateReference(queue, 3);
@@ -1226,7 +1243,7 @@
*/
public void testPauseAndResumeWithAsync() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
// pauses the queue
queue.pause();
@@ -1281,7 +1298,7 @@
public void testPauseAndResumeWithDirect() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
// Now add a consumer
FakeConsumer consumer = new FakeConsumer();
Modified:
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
===================================================================
---
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeQueueFactory.java 2009-11-19
17:50:33 UTC (rev 8324)
+++
branches/ClebertTemporary/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeQueueFactory.java 2009-11-19
19:53:49 UTC (rev 8325)
@@ -13,6 +13,7 @@
package org.hornetq.tests.unit.core.server.impl.fakes;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -32,14 +33,16 @@
*/
public class FakeQueueFactory implements QueueFactory
{
- private final ScheduledExecutorService scheduledExecutor =
Executors.newSingleThreadScheduledExecutor();
-
+ private final ScheduledExecutorService scheduledExecutor =
Executors.newSingleThreadScheduledExecutor();
+
+ private final ExecutorService executor = Executors.newSingleThreadExecutor();
+
private PostOffice postOffice;
public Queue createQueue(long persistenceID, final SimpleString address, SimpleString
name, Filter filter,
boolean durable, boolean temporary)
{
- return new QueueImpl(persistenceID, address, name, filter, durable, temporary,
scheduledExecutor, postOffice, null, null);
+ return new QueueImpl(persistenceID, address, name, filter, durable, temporary,
executor, scheduledExecutor, postOffice, null, null);
}
public void setPostOffice(PostOffice postOffice)
@@ -47,5 +50,12 @@
this.postOffice = postOffice;
}
+
+ public void stop() throws Exception
+ {
+ scheduledExecutor.shutdown();
+
+ executor.shutdown();
+ }
}