Author: clebert.suconic(a)jboss.com
Date: 2009-11-22 21:04:55 -0500 (Sun, 22 Nov 2009)
New Revision: 8369
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/management/ManagementService.java
branches/ClebertCallback/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java
branches/ClebertCallback/src/main/org/hornetq/core/persistence/OperationContext.java
branches/ClebertCallback/src/main/org/hornetq/core/persistence/StorageManager.java
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/ClebertCallback/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java
branches/ClebertCallback/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/ClebertCallback/src/main/org/hornetq/core/server/Queue.java
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/LastValueQueue.java
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/largemessage/ServerLargeMessageTest.java
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
branches/ClebertCallback/tests/src/org/hornetq/tests/performance/persistence/StorageManagerTimingTest.java
branches/ClebertCallback/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
branches/ClebertCallback/tests/src/org/hornetq/tests/util/UnitTestCase.java
Log:
Reverting unecessary executors..
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/management/ManagementService.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/management/ManagementService.java 2009-11-22
18:49:03 UTC (rev 8368)
+++
branches/ClebertCallback/src/main/org/hornetq/core/management/ManagementService.java 2009-11-23
02:04:55 UTC (rev 8369)
@@ -71,6 +71,8 @@
ObjectNameBuilder getObjectNameBuilder();
// Resource Registration
+
+ void setStorageManager(StorageManager storageManager);
HornetQServerControlImpl registerServer(PostOffice postOffice,
StorageManager storageManager,
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java 2009-11-22
18:49:03 UTC (rev 8368)
+++
branches/ClebertCallback/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java 2009-11-23
02:04:55 UTC (rev 8369)
@@ -59,7 +59,6 @@
import org.hornetq.core.messagecounter.impl.MessageCounterManagerImpl;
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.remoting.server.RemotingService;
import org.hornetq.core.remoting.spi.Acceptor;
@@ -181,6 +180,11 @@
{
return messageCounterManager;
}
+
+ public void setStorageManager(StorageManager storageManager)
+ {
+ this.storageManager = storageManager;
+ }
public HornetQServerControlImpl registerServer(final PostOffice postOffice,
final StorageManager storageManager,
@@ -741,28 +745,17 @@
}
}
- // TODO: Talk to Andy and Jeff about a better way to sync this...
- System.out.println("Waiting");
- final CountDownLatch latch = new CountDownLatch(1);
- OperationContextImpl.getInstance().executeOnCompletion(new IOAsyncTask()
+ if (storageManager != null)
{
-
- public void done()
- {
- System.out.println("Done on management");
- latch.countDown();
- }
-
- public void onError(int errorCode, String errorMessage)
- {
- }
-
- });
-
- OperationContextImpl.getInstance().complete();
-
- latch.await(5, TimeUnit.SECONDS);
- System.out.println("Done");
+ System.out.println("Waiting on management...");
+ storageManager.waitOnOperations(managementRequestTimeout);
+ storageManager.clearContext();
+ System.out.println("Done");
+ }
+ else
+ {
+ new Exception("storagemanager is null, can't wait on
operations").printStackTrace();
+ }
}
public void enableNotifications(boolean enabled)
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/persistence/OperationContext.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/persistence/OperationContext.java 2009-11-22
18:49:03 UTC (rev 8368)
+++
branches/ClebertCallback/src/main/org/hornetq/core/persistence/OperationContext.java 2009-11-23
02:04:55 UTC (rev 8369)
@@ -30,11 +30,6 @@
public interface OperationContext extends IOCompletion
{
- boolean hasReplication();
-
- /** Reattach the context to the current thread */
- void reinstall();
-
/** The executor used on responses.
* If this is not set, it will use the current thread. */
void setExecutor(Executor executor);
@@ -49,8 +44,5 @@
/** To be called when there are no more operations pending */
void complete();
-
- /** Is this a special operation to sync replication. */
- boolean isSync();
}
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/persistence/StorageManager.java 2009-11-22
18:49:03 UTC (rev 8368)
+++
branches/ClebertCallback/src/main/org/hornetq/core/persistence/StorageManager.java 2009-11-23
02:04:55 UTC (rev 8369)
@@ -52,9 +52,12 @@
/** Get the context associated with the thread for later reuse */
OperationContext getContext();
- /** It just creates an OperationContext without associating it to any threads */
+ /** It just creates an OperationContext without associating it */
OperationContext newContext(Executor executor);
+ /** Set the context back to the thread */
+ void setContext(OperationContext context);
+
// Message related operations
void pageClosed(SimpleString storeName, int pageNumber);
@@ -77,6 +80,8 @@
/** To close the OperationsContext */
void completeOperations();
+
+ void clearContext();
UUID getPersistentID();
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-22
18:49:03 UTC (rev 8368)
+++
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-23
02:04:55 UTC (rev 8369)
@@ -78,6 +78,7 @@
import org.hornetq.core.transaction.Transaction.State;
import org.hornetq.core.transaction.impl.TransactionImpl;
import org.hornetq.utils.DataConstants;
+import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.Pair;
import org.hornetq.utils.SimpleString;
import org.hornetq.utils.UUID;
@@ -93,6 +94,7 @@
*/
public class JournalStorageManager implements StorageManager
{
+
private static final Logger log = Logger.getLogger(JournalStorageManager.class);
private static final long CHECKPOINT_BATCH_SIZE = Integer.MAX_VALUE;
@@ -144,6 +146,9 @@
private final SequentialFileFactory largeMessagesFactory;
private volatile boolean started;
+
+ /** Used to create Operation Contexts */
+ private final ExecutorFactory executorFactory;
private final Executor executor;
@@ -163,15 +168,17 @@
private final String largeMessagesDirectory;
- public JournalStorageManager(final Configuration config, final Executor executor)
+ public JournalStorageManager(final Configuration config, final ExecutorFactory
executorFactory)
{
- this(config, executor, null);
+ this(config, executorFactory, null);
}
- public JournalStorageManager(final Configuration config, final Executor executor,
final ReplicationManager replicator)
+ public JournalStorageManager(final Configuration config, final ExecutorFactory
executorFactory, final ReplicationManager replicator)
{
- this.executor = executor;
+ this.executorFactory = executorFactory;
+ this.executor = executorFactory.getExecutor();
+
this.replicator = replicator;
if (config.getJournalType() != JournalType.NIO && config.getJournalType()
!= JournalType.ASYNCIO)
@@ -295,8 +302,13 @@
*/
public void completeOperations()
{
- OperationContextImpl.getInstance().complete();
+ getContext().complete();
}
+
+ public void clearContext()
+ {
+ OperationContextImpl.clearContext();
+ }
public boolean isReplicated()
{
@@ -373,8 +385,13 @@
*/
public OperationContext getContext()
{
- return OperationContextImpl.getInstance();
+ return OperationContextImpl.getContext(executorFactory);
}
+
+ public void setContext(OperationContext context)
+ {
+ OperationContextImpl.setContext(context);
+ }
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#newContext()
@@ -386,7 +403,7 @@
public void afterCompleteOperations(IOAsyncTask run)
{
- OperationContextImpl.getInstance().executeOnCompletion(run);
+ getContext().executeOnCompletion(run);
}
public UUID getPersistentID()
@@ -469,27 +486,27 @@
messageJournal.appendAddRecord(message.getMessageID(),
ADD_LARGE_MESSAGE,
new
LargeMessageEncoding((LargeServerMessage)message),
- false, getIOContext());
+ false, getContext());
}
else
{
- messageJournal.appendAddRecord(message.getMessageID(), ADD_MESSAGE, message,
false, getIOContext());
+ messageJournal.appendAddRecord(message.getMessageID(), ADD_MESSAGE, message,
false, getContext());
}
}
public void storeReference(final long queueID, final long messageID) throws Exception
{
- messageJournal.appendUpdateRecord(messageID, ADD_REF, new RefEncoding(queueID),
syncNonTransactional, getIOContext());
+ messageJournal.appendUpdateRecord(messageID, ADD_REF, new RefEncoding(queueID),
syncNonTransactional, getContext());
}
public void storeAcknowledge(final long queueID, final long messageID) throws
Exception
{
- messageJournal.appendUpdateRecord(messageID, ACKNOWLEDGE_REF, new
RefEncoding(queueID), syncNonTransactional, getIOContext());
+ messageJournal.appendUpdateRecord(messageID, ACKNOWLEDGE_REF, new
RefEncoding(queueID), syncNonTransactional, getContext());
}
public void deleteMessage(final long messageID) throws Exception
{
- messageJournal.appendDeleteRecord(messageID, syncNonTransactional,
getIOContext());
+ messageJournal.appendDeleteRecord(messageID, syncNonTransactional, getContext());
}
public void updateScheduledDeliveryTime(final MessageReference ref) throws Exception
@@ -500,19 +517,19 @@
messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
SET_SCHEDULED_DELIVERY_TIME,
encoding,
- syncNonTransactional, getIOContext());
+ syncNonTransactional, getContext());
}
public void storeDuplicateID(final SimpleString address, final byte[] duplID, final
long recordID) throws Exception
{
DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID);
- messageJournal.appendAddRecord(recordID, DUPLICATE_ID, encoding,
syncNonTransactional, getIOContext());
+ messageJournal.appendAddRecord(recordID, DUPLICATE_ID, encoding,
syncNonTransactional, getContext());
}
public void deleteDuplicateID(long recordID) throws Exception
{
- messageJournal.appendDeleteRecord(recordID, syncNonTransactional, getIOContext());
+ messageJournal.appendDeleteRecord(recordID, syncNonTransactional, getContext());
}
// Transactional operations
@@ -568,13 +585,13 @@
public long storeHeuristicCompletion(Xid xid, boolean isCommit) throws Exception
{
long id = generateUniqueID();
- messageJournal.appendAddRecord(id, HEURISTIC_COMPLETION, new
HeuristicCompletionEncoding(xid, isCommit), true, getIOContext());
+ messageJournal.appendAddRecord(id, HEURISTIC_COMPLETION, new
HeuristicCompletionEncoding(xid, isCommit), true, getContext());
return id;
}
public void deleteHeuristicCompletion(long id) throws Exception
{
- messageJournal.appendDeleteRecord(id, true, getIOContext());
+ messageJournal.appendDeleteRecord(id, true, getContext());
}
public void deletePageTransactional(final long txID, final long recordID) throws
Exception
@@ -600,17 +617,17 @@
public void prepare(final long txID, final Xid xid) throws Exception
{
- messageJournal.appendPrepareRecord(txID, new XidEncoding(xid), syncTransactional,
getIOContext());
+ messageJournal.appendPrepareRecord(txID, new XidEncoding(xid), syncTransactional,
getContext());
}
public void commit(final long txID) throws Exception
{
- messageJournal.appendCommitRecord(txID, syncTransactional, getIOContext());
+ messageJournal.appendCommitRecord(txID, syncTransactional, getContext());
}
public void rollback(final long txID) throws Exception
{
- messageJournal.appendRollbackRecord(txID, syncTransactional, getIOContext());
+ messageJournal.appendRollbackRecord(txID, syncTransactional, getContext());
}
public void storeDuplicateIDTransactional(final long txID,
@@ -648,7 +665,7 @@
messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
UPDATE_DELIVERY_COUNT,
updateInfo,
- syncNonTransactional, getIOContext());
+ syncNonTransactional, getContext());
}
private static final class AddMessageRecord
@@ -1394,11 +1411,6 @@
// Private
----------------------------------------------------------------------------------
- private IOCompletion getIOContext()
- {
- return OperationContextImpl.getInstance();
- }
-
private void checkAndCreateDir(final String dir, final boolean create)
{
File f = new File(dir);
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2009-11-22
18:49:03 UTC (rev 8368)
+++
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2009-11-23
02:04:55 UTC (rev 8369)
@@ -21,9 +21,8 @@
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.persistence.OperationContext;
+import org.hornetq.utils.ExecutorFactory;
-import sun.security.util.PendingException;
-
/**
*
* This class will hold operations when there are IO operations...
@@ -33,24 +32,31 @@
*/
public class OperationContextImpl implements OperationContext
{
- private static final ThreadLocal<OperationContext> tlContext = new
ThreadLocal<OperationContext>();
+
+ private static final ThreadLocal<OperationContext> threadLocalContext = new
ThreadLocal<OperationContext>();
- public static void setInstance(OperationContext context)
+ public static void clearContext()
{
- tlContext.set(context);
+ threadLocalContext.set(null);
}
- public static OperationContext getInstance()
+ public static OperationContext getContext(final ExecutorFactory executorFactory)
{
- OperationContext token = tlContext.get();
+ OperationContext token = threadLocalContext.get();
if (token == null)
{
- token = new OperationContextImpl();
- tlContext.set(token);
+ token = new OperationContextImpl(executorFactory.getExecutor());
+ threadLocalContext.set(token);
}
return token;
}
-
+
+ public static void setContext(OperationContext context)
+ {
+ threadLocalContext.set(context);
+ }
+
+
private List<TaskHolder> tasks;
private volatile int storeLineUp = 0;
@@ -64,38 +70,21 @@
private int stored = 0;
private int replicated = 0;
-
+
private int errorCode = -1;
-
+
private String errorMessage = null;
-
+
private Executor executor;
-
+
private final AtomicInteger executorsPending = new AtomicInteger(0);
- /**
- * @param executor
- */
- public OperationContextImpl()
- {
- super();
- }
-
public OperationContextImpl(final Executor executor)
{
super();
this.executor = executor;
}
-
- /*
- * @see org.hornetq.core.persistence.OperationContext#reinstall()
- */
- public void reinstall()
- {
- setInstance(this);
- }
-
/** To be called by the replication manager, when new replication is added to the
queue */
public void lineUp()
{
@@ -106,7 +95,7 @@
{
replicationLineUp++;
}
-
+
/** this method needs to be called before the executor became operational */
public void setExecutor(Executor executor)
{
@@ -119,50 +108,56 @@
checkTasks();
}
- public boolean hasReplication()
- {
- return replicationLineUp > 0;
- }
-
/** You may have several actions to be done after a replication operation is
completed. */
- public synchronized void executeOnCompletion(final IOAsyncTask completion)
+ public void executeOnCompletion(final IOAsyncTask completion)
{
- if (tasks == null)
- {
- tasks = new LinkedList<TaskHolder>();
- minimalReplicated = replicationLineUp;
- minimalStore = storeLineUp;
- }
+ boolean executeNow = false;
- // On this case, we can just execute the context directly
- if (replicationLineUp == replicated && storeLineUp == stored)
+ synchronized (this)
{
- if (executor != null)
+ if (tasks == null)
{
- // We want to avoid the executor if everything is complete...
- // However, we can't execute the context if there are executions pending
- // We need to use the executor on this case
- if (executorsPending.get() == 0)
+ tasks = new LinkedList<TaskHolder>();
+ minimalReplicated = replicationLineUp;
+ minimalStore = storeLineUp;
+ }
+
+ // On this case, we can just execute the context directly
+ if (replicationLineUp == replicated && storeLineUp == stored)
+ {
+ if (executor != null)
{
- // No need to use an executor here or a context switch
- // there are no actions pending.. hence we can just execute the task
directly on the same thread
- completion.done();
+ // We want to avoid the executor if everything is complete...
+ // However, we can't execute the context if there are executions
pending
+ // We need to use the executor on this case
+ if (executorsPending.get() == 0)
+ {
+ // No need to use an executor here or a context switch
+ // there are no actions pending.. hence we can just execute the task
directly on the same thread
+ executeNow = true;
+ }
+ else
+ {
+ execute(completion);
+ }
}
else
{
- execute(completion);
+ executeNow = true;
}
}
else
{
- // Execute without an executor
- completion.done();
+ tasks.add(new TaskHolder(completion));
}
}
- else
+
+ if (executeNow)
{
- tasks.add(new TaskHolder(completion));
+ // Executing outside of any locks
+ completion.done();
}
+
}
/** To be called by the storage manager, when data is confirmed on the channel */
@@ -183,7 +178,7 @@
if (!holder.executed && stored >= holder.storeLined &&
replicated >= holder.replicationLined)
{
holder.executed = true;
-
+
if (executor != null)
{
// If set, we use an executor to avoid the server being single
threaded
@@ -193,7 +188,7 @@
{
holder.task.done();
}
-
+
iter.remove();
}
else
@@ -227,8 +222,6 @@
*/
public void complete()
{
- tlContext.set(null);
-
// TODO: test and fix exceptions on the Context
if (tasks != null && errorMessage != null)
{
@@ -237,16 +230,11 @@
run.task.onError(errorCode, errorMessage);
}
}
-
+
// We hold errors until the complete is set, or the callbacks will never get
informed
errorCode = -1;
errorMessage = null;
}
-
- public boolean isSync()
- {
- return false;
- }
/* (non-Javadoc)
* @see org.hornetq.core.asyncio.AIOCallback#onError(int, java.lang.String)
@@ -275,5 +263,4 @@
}
}
-
}
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-11-22
18:49:03 UTC (rev 8368)
+++
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-11-23
02:04:55 UTC (rev 8369)
@@ -374,4 +374,18 @@
return null;
}
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.persistence.StorageManager#setContext(org.hornetq.core.persistence.OperationContext)
+ */
+ public void setContext(OperationContext context)
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#clearContext()
+ */
+ public void clearContext()
+ {
+ }
+
}
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java 2009-11-22
18:49:03 UTC (rev 8368)
+++
branches/ClebertCallback/src/main/org/hornetq/core/postoffice/impl/DuplicateIDCacheImpl.java 2009-11-23
02:04:55 UTC (rev 8369)
@@ -18,7 +18,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.Executor;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.persistence.StorageManager;
@@ -59,14 +58,11 @@
private final StorageManager storageManager;
private final boolean persist;
-
- private final Executor executor;
public DuplicateIDCacheImpl(final SimpleString address,
final int size,
final StorageManager storageManager,
- final boolean persist,
- final Executor executor)
+ final boolean persist)
{
this.address = address;
@@ -77,8 +73,6 @@
this.storageManager = storageManager;
this.persist = persist;
-
- this.executor = executor;
}
public void load(final List<Pair<byte[], Long>> theIds) throws Exception
@@ -144,7 +138,7 @@
storageManager.storeDuplicateID(address, duplID, recordID);
}
- addToCacheInMemory(duplID, recordID, null);
+ addToCacheInMemory(duplID, recordID);
}
else
{
@@ -161,12 +155,11 @@
}
}
-
- private synchronized void addToCacheInMemory(final byte[] duplID, final long recordID,
final Executor journalExecutor) throws Exception
+ private synchronized void addToCacheInMemory(final byte[] duplID, final long
recordID)
{
cache.add(new ByteArrayHolder(duplID));
- final Pair<ByteArrayHolder, Long> id;
+ Pair<ByteArrayHolder, Long> id;
if (pos < ids.size())
{
@@ -180,27 +173,13 @@
// reclaimed
id.a = new ByteArrayHolder(duplID);
- if (journalExecutor != null)
+ try
{
- // We can't execute any IO inside the Journal callback, so taking it
outside
- journalExecutor.execute(new Runnable()
- {
- public void run()
- {
- try
- {
- storageManager.deleteDuplicateID(id.b);
- }
- catch (Exception e)
- {
- log.warn("Error on deleting duplicate cache");
- }
- }
- });
+ storageManager.deleteDuplicateID(id.b);
}
- else
+ catch (Exception e)
{
- storageManager.deleteDuplicateID(id.b);
+ log.warn("Error on deleting duplicate cache", e);
}
id.b = recordID;
@@ -237,14 +216,8 @@
{
if (!done)
{
- try
- {
- addToCacheInMemory(duplID, recordID, executor);
- }
- catch (Exception shouldNotHappen)
- {
- // if you pass an executor to addtoCache, an exception will never happen
here
- }
+ addToCacheInMemory(duplID, recordID);
+
done = true;
}
}
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-22
18:49:03 UTC (rev 8368)
+++
branches/ClebertCallback/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-23
02:04:55 UTC (rev 8369)
@@ -743,9 +743,7 @@
if (cache == null)
{
- // TODO: What's the right executor?
- // Is there another way
- cache = new DuplicateIDCacheImpl(address, idCacheSize, storageManager,
persistIDCache, redistributorExecutorFactory.getExecutor());
+ cache = new DuplicateIDCacheImpl(address, idCacheSize, storageManager,
persistIDCache);
DuplicateIDCache oldCache = duplicateIDCaches.putIfAbsent(address, cache);
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-11-22
18:49:03 UTC (rev 8368)
+++
branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-11-23
02:04:55 UTC (rev 8369)
@@ -190,8 +190,10 @@
{
Configuration config = server.getConfiguration();
- storage = new JournalStorageManager(config,
server.getExecutorFactory().getExecutor());
+ storage = new JournalStorageManager(config, server.getExecutorFactory());
storage.start();
+
+ server.getManagementService().setStorageManager(storage);
bindingsJournal = storage.getBindingsJournal();
messagingJournal = storage.getMessageJournal();
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-11-22
18:49:03 UTC (rev 8368)
+++
branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-11-23
02:04:55 UTC (rev 8369)
@@ -14,8 +14,6 @@
package org.hornetq.core.replication.impl;
import java.util.LinkedHashSet;
-import java.util.LinkedList;
-import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -28,6 +26,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.persistence.OperationContext;
+import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.ChannelHandler;
@@ -49,6 +48,7 @@
import org.hornetq.core.remoting.impl.wireformat.ReplicationPrepareMessage;
import org.hornetq.core.remoting.spi.HornetQBuffer;
import org.hornetq.core.replication.ReplicationManager;
+import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.SimpleString;
/**
@@ -83,6 +83,8 @@
private final Object replicationLock = new Object();
private final Queue<OperationContext> pendingTokens = new
ConcurrentLinkedQueue<OperationContext>();
+
+ private final ExecutorFactory executorFactory;
// Static --------------------------------------------------------
@@ -91,11 +93,12 @@
/**
* @param replicationConnectionManager
*/
- public ReplicationManagerImpl(final FailoverManager failoverManager, final int
backupWindowSize)
+ public ReplicationManagerImpl(final FailoverManager failoverManager, final
ExecutorFactory executorFactory, final int backupWindowSize)
{
super();
this.failoverManager = failoverManager;
this.backupWindowSize = backupWindowSize;
+ this.executorFactory = executorFactory;
}
// Public --------------------------------------------------------
@@ -430,7 +433,7 @@
{
boolean runItNow = false;
- OperationContext repliToken = getContext();
+ OperationContext repliToken = OperationContextImpl.getContext(executorFactory);
repliToken.replicationLineUp();
synchronized (replicationLock)
@@ -459,67 +462,16 @@
private void replicated()
{
- List<OperationContext> tokensToExecute = getTokens();
+ OperationContext ctx = pendingTokens.poll();
- for (OperationContext ctx : tokensToExecute)
+ if (ctx == null)
{
- ctx.replicationDone();
+ throw new IllegalStateException("Missing replication token on the
queue.");
}
- }
- public OperationContext getContext()
- {
- return OperationContextImpl.getInstance();
+ ctx.replicationDone();
}
- /**
- * This method will first get all the sync tokens (that won't go to the backup
node)
- * Then it will get the round trip tokens.
- * At last, if the list is empty, it will verify if there are any future tokens that
are sync tokens, to avoid a case where no more replication is done due to inactivity.
- * @return
- */
- private List<OperationContext> getTokens()
- {
- List<OperationContext> retList = new LinkedList<OperationContext>();
-
- OperationContext tokenPolled = null;
-
- // First will get all the non replicated tokens up to the first one that is not
replicated
- do
- {
- tokenPolled = pendingTokens.poll();
-
- if (tokenPolled == null)
- {
- throw new IllegalStateException("Missing replication token on the
queue.");
- }
-
- retList.add(tokenPolled);
-
- }
- while (tokenPolled.isSync());
-
- // This is to avoid a situation where we won't have more replicated packets
- // We need to make sure we process any pending sync packet up to the next non empty
packet
- synchronized (replicationLock)
- {
- while (!pendingTokens.isEmpty() && pendingTokens.peek().isSync())
- {
- tokenPolled = pendingTokens.poll();
- if (!tokenPolled.isSync())
- {
- throw new IllegalStateException("Replicatoin context is not a
roundtrip token as expected");
- }
-
- retList.add(tokenPolled);
-
- }
- }
-
- return retList;
- }
-
-
// Inner classes -------------------------------------------------
protected class ResponseHandler implements ChannelHandler
Modified: branches/ClebertCallback/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- branches/ClebertCallback/src/main/org/hornetq/core/server/Queue.java 2009-11-22
18:49:03 UTC (rev 8368)
+++ branches/ClebertCallback/src/main/org/hornetq/core/server/Queue.java 2009-11-23
02:04:55 UTC (rev 8369)
@@ -125,10 +125,6 @@
Collection<Consumer> getConsumers();
- /** We can't execute IO operation when inside the IOCallback /
TransactionCallback.
- * This method will will perform IO operations in a second thread */
- boolean checkDLQ(MessageReference ref, Executor ioExecutor) throws Exception;
-
boolean checkDLQ(MessageReference ref) throws Exception;
void lockDelivery();
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-11-22
18:49:03 UTC (rev 8368)
+++
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-11-23
02:04:55 UTC (rev 8369)
@@ -651,7 +651,7 @@
Executor sessionExecutor = executorFactory.getExecutor();
- OperationContext sessionContext = storageManager.newContext(sessionExecutor);
+ storageManager.newContext(sessionExecutor);
final ServerSessionImpl session = new ServerSessionImpl(name,
username,
@@ -667,7 +667,6 @@
postOffice,
resourceManager,
securityStore,
- sessionContext,
sessionExecutor,
channel,
managementService,
@@ -677,7 +676,8 @@
sessions.put(name, session);
- ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session,
sessionContext);
+ // The executor on the OperationContext here has to be the same as the session, or
we would have ordering issues on messages
+ ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session,
storageManager.newContext(sessionExecutor), storageManager);
session.setHandler(handler);
@@ -906,7 +906,7 @@
{
if (configuration.isPersistenceEnabled())
{
- return new JournalStorageManager(configuration, threadPool,
replicationManager);
+ return new JournalStorageManager(configuration, this.executorFactory,
replicationManager);
}
else
{
@@ -935,6 +935,7 @@
replicationFailoverManager = createBackupConnection(backupConnector,
threadPool, scheduledPool);
replicationManager = new ReplicationManagerImpl(replicationFailoverManager,
+ executorFactory,
configuration.getBackupWindowSize());
replicationManager.start();
}
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/LastValueQueue.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2009-11-22
18:49:03 UTC (rev 8368)
+++
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2009-11-23
02:04:55 UTC (rev 8369)
@@ -51,7 +51,6 @@
final Filter filter,
final boolean durable,
final boolean temporary,
- final Executor executor,
final ScheduledExecutorService scheduledExecutor,
final PostOffice postOffice,
final StorageManager storageManager,
@@ -63,7 +62,6 @@
filter,
durable,
temporary,
- executor,
scheduledExecutor,
postOffice,
storageManager,
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java 2009-11-22
18:49:03 UTC (rev 8368)
+++
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/QueueFactoryImpl.java 2009-11-23
02:04:55 UTC (rev 8369)
@@ -83,7 +83,6 @@
filter,
durable,
temporary,
- executorFactory.getExecutor(),
scheduledExecutor,
postOffice,
storageManager,
@@ -97,7 +96,6 @@
filter,
durable,
temporary,
- executorFactory.getExecutor(),
scheduledExecutor,
postOffice,
storageManager,
Modified: branches/ClebertCallback/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-11-22
18:49:03 UTC (rev 8368)
+++
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-11-23
02:04:55 UTC (rev 8369)
@@ -115,9 +115,6 @@
private final ScheduledExecutorService scheduledExecutor;
- /** We can't perform any operation on the journal while inside the Transactional
operations. */
- private final Executor journalExecutor;
-
private final SimpleString address;
private Redistributor redistributor;
@@ -142,7 +139,6 @@
final Filter filter,
final boolean durable,
final boolean temporary,
- final Executor executor,
final ScheduledExecutorService scheduledExecutor,
final PostOffice postOffice,
final StorageManager storageManager,
@@ -167,8 +163,6 @@
this.addressSettingsRepository = addressSettingsRepository;
this.scheduledExecutor = scheduledExecutor;
-
- this.journalExecutor = executor;
direct = true;
@@ -939,37 +933,12 @@
public boolean checkDLQ(final MessageReference reference) throws Exception
{
- return checkDLQ(reference, null);
- }
-
- public boolean checkDLQ(final MessageReference reference, Executor ioExecutor) throws
Exception
- {
ServerMessage message = reference.getMessage();
if (message.isDurable() && durable)
{
- if (ioExecutor != null)
- {
- ioExecutor.execute(new Runnable()
- {
- public void run()
- {
- try
- {
- storageManager.updateDeliveryCount(reference);
- storageManager.completeOperations();
- }
- catch (Exception e)
- {
- log.warn("Can't update delivery count on checkDLQ",
e);
- }
- }
- });
- }
- else
- {
- storageManager.updateDeliveryCount(reference);
- }
+ storageManager.updateDeliveryCount(reference);
+ storageManager.waitOnOperations();
}
AddressSettings addressSettings =
addressSettingsRepository.getMatch(address.toString());
@@ -978,28 +947,8 @@
if (maxDeliveries > 0 && reference.getDeliveryCount() >=
maxDeliveries)
{
- if (ioExecutor != null)
- {
- ioExecutor.execute(new Runnable()
- {
- public void run()
- {
- try
- {
- sendToDeadLetterAddress(reference);
- storageManager.completeOperations();
- }
- catch (Exception e)
- {
- log.warn("Error on DLQ send", e);
- }
- }
- });
- }
- else
- {
- sendToDeadLetterAddress(reference);
- }
+ sendToDeadLetterAddress(reference);
+ storageManager.waitOnOperations();
return false;
}
@@ -1011,28 +960,7 @@
{
reference.setScheduledDeliveryTime(System.currentTimeMillis() +
redeliveryDelay);
- if (ioExecutor != null)
- {
- ioExecutor.execute(new Runnable()
- {
- public void run()
- {
- try
- {
- storageManager.updateScheduledDeliveryTime(reference);
- storageManager.completeOperations();
- }
- catch (Exception e)
- {
- log.warn("Error on DLQ send", e);
- }
- }
- });
- }
- else
- {
- storageManager.updateScheduledDeliveryTime(reference);
- }
+ storageManager.updateScheduledDeliveryTime(reference);
}
deliveringCount.decrementAndGet();
@@ -1495,23 +1423,14 @@
// also note then when this happens as part of a trasaction its the tx commt
of the ack that is important
// not this
-
- // and this has to happen in a different thread
-
- journalExecutor.execute(new Runnable()
+ try
{
- public void run()
- {
- try
- {
- storageManager.deleteMessage(message.getMessageID());
- }
- catch (Exception e)
- {
- log.warn("Unable to remove message id = " +
message.getMessageID() + " please remove manually");
- }
- }
- });
+ storageManager.deleteMessage(message.getMessageID());
+ }
+ catch (Exception e)
+ {
+ log.warn("Unable to remove message id = " +
message.getMessageID() + " please remove manually");
+ }
}
}
@@ -1584,7 +1503,7 @@
{
try
{
- if (ref.getQueue().checkDLQ(ref, journalExecutor))
+ if (ref.getQueue().checkDLQ(ref))
{
LinkedList<MessageReference> toCancel =
queueMap.get(ref.getQueue());
@@ -1600,8 +1519,6 @@
}
catch (Exception e)
{
- // checkDLQ here will be using an executor, this shouldn't happen
- // don't you just hate checked exceptions in java?
log.warn("Error on checkDLQ", e);
}
}
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-22
18:49:03 UTC (rev 8368)
+++
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-23
02:04:55 UTC (rev 8369)
@@ -39,9 +39,7 @@
import org.hornetq.core.management.ManagementService;
import org.hornetq.core.management.Notification;
import org.hornetq.core.paging.PagingStore;
-import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.BindingType;
import org.hornetq.core.postoffice.Bindings;
@@ -189,10 +187,6 @@
private final SimpleString managementAddress;
- /** We always use the same operation context for the session.
- * With that we can perform extra checks */
- private final OperationContext sessionOperationContext;
-
// The current currentLargeMessage being processed
private volatile LargeServerMessage currentLargeMessage;
@@ -218,7 +212,6 @@
final PostOffice postOffice,
final ResourceManager resourceManager,
final SecurityStore securityStore,
- final OperationContext sessionOperationContext,
final Executor executor,
final Channel channel,
final ManagementService managementService,
@@ -249,8 +242,6 @@
this.securityStore = securityStore;
- this.sessionOperationContext = sessionOperationContext;
-
this.executor = executor;
if (!xa)
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java 2009-11-22
18:49:03 UTC (rev 8368)
+++
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java 2009-11-23
02:04:55 UTC (rev 8369)
@@ -46,6 +46,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.persistence.OperationContext;
+import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.remoting.ChannelHandler;
import org.hornetq.core.remoting.Packet;
import org.hornetq.core.remoting.impl.wireformat.CreateQueueMessage;
@@ -90,10 +91,16 @@
private final ServerSession session;
private final OperationContext sessionContext;
+
+ // Storagemanager here is used to set the Context
+ private final StorageManager storageManager;
- public ServerSessionPacketHandler(final ServerSession session, OperationContext
sessionContext)
+ public ServerSessionPacketHandler(final ServerSession session, OperationContext
sessionContext, StorageManager storageManager)
{
this.session = session;
+
+ this.storageManager = storageManager;
+
this.sessionContext = sessionContext;
}
@@ -108,7 +115,7 @@
if (sessionContext != null)
{
- sessionContext.reinstall();
+ storageManager.setContext(sessionContext);
}
try
@@ -303,7 +310,7 @@
{
if (sessionContext != null)
{
- sessionContext.complete();
+ storageManager.completeOperations();
}
}
}
Modified:
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/largemessage/ServerLargeMessageTest.java
===================================================================
---
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/largemessage/ServerLargeMessageTest.java 2009-11-22
18:49:03 UTC (rev 8368)
+++
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/largemessage/ServerLargeMessageTest.java 2009-11-23
02:04:55 UTC (rev 8369)
@@ -13,6 +13,8 @@
package org.hornetq.tests.integration.largemessage;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.hornetq.core.config.Configuration;
@@ -21,6 +23,8 @@
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.utils.ExecutorFactory;
+import org.hornetq.utils.OrderedExecutorFactory;
/**
* A ServerLargeMessageTest
@@ -36,12 +40,32 @@
// Attributes ----------------------------------------------------
+ ExecutorService executor;
+
+ ExecutorFactory execFactory;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
+ protected void setup() throws Exception
+ {
+ super.setUp();
+
+ executor = Executors.newCachedThreadPool();
+
+ execFactory = new OrderedExecutorFactory(executor);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ executor.shutdown();
+
+ super.tearDown();
+ }
+
public void testLargeMessageCopy() throws Exception
{
clearData();
@@ -52,7 +76,7 @@
configuration.setJournalType(JournalType.ASYNCIO);
- final JournalStorageManager journal = new JournalStorageManager(configuration,
Executors.newCachedThreadPool());
+ final JournalStorageManager journal = new JournalStorageManager(configuration,
execFactory);
journal.start();
LargeServerMessage msg = journal.createLargeMessage();
Modified:
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java
===================================================================
---
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java 2009-11-22
18:49:03 UTC (rev 8368)
+++
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java 2009-11-23
02:04:55 UTC (rev 8369)
@@ -18,6 +18,8 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.hornetq.core.config.Configuration;
@@ -30,6 +32,8 @@
import org.hornetq.core.server.Queue;
import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.utils.ExecutorFactory;
+import org.hornetq.utils.OrderedExecutorFactory;
/**
* A DeleteMessagesRestartTest
@@ -48,11 +52,32 @@
// Attributes ----------------------------------------------------
+ ExecutorService executor;
+
+ ExecutorFactory execFactory;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
// Public --------------------------------------------------------
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ executor = Executors.newCachedThreadPool();
+
+ this.execFactory = new OrderedExecutorFactory(executor);
+ }
+
+ protected void tearDown() throws Exception
+ {
+ executor.shutdown();
+
+ super.tearDown();
+ }
+
public void testRestartStorageManager() throws Exception
{
@@ -67,7 +92,8 @@
PostOffice postOffice = new FakePostOffice();
- final JournalStorageManager journal = new JournalStorageManager(configuration,
Executors.newCachedThreadPool());
+ final JournalStorageManager journal = new JournalStorageManager(configuration,
execFactory);
+
try
{
Modified:
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
---
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-11-22
18:49:03 UTC (rev 8368)
+++
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-11-23
02:04:55 UTC (rev 8369)
@@ -53,7 +53,7 @@
import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.remoting.Interceptor;
import org.hornetq.core.remoting.Packet;
import org.hornetq.core.remoting.RemotingConnection;
@@ -70,6 +70,7 @@
import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.HornetQThreadFactory;
+import org.hornetq.utils.OrderedExecutorFactory;
import org.hornetq.utils.SimpleString;
/**
@@ -89,6 +90,8 @@
private ThreadFactory tFactory;
private ExecutorService executor;
+
+ private ExecutorFactory factory;
private ScheduledExecutorService scheduledExecutor;
@@ -114,6 +117,7 @@
try
{
ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+ this.factory,
ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
manager.start();
manager.stop();
@@ -140,6 +144,7 @@
try
{
ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+ this.factory,
ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
manager.start();
try
@@ -182,6 +187,7 @@
try
{
ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+ this.factory,
ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
manager.start();
@@ -189,6 +195,7 @@
try
{
ReplicationManagerImpl manager2 = new
ReplicationManagerImpl(failoverManager,
+ this.factory,
ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
manager2.start();
@@ -223,6 +230,7 @@
try
{
ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+ this.factory,
ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
try
@@ -241,7 +249,7 @@
server.stop();
}
}
-
+
public void testSendPackets() throws Exception
{
@@ -257,7 +265,10 @@
try
{
+ StorageManager storage = getStorage();
+
ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+ this.factory,
ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
manager.start();
@@ -276,7 +287,7 @@
replicatedJournal.appendPrepareRecord(3, new FakeData(), false);
replicatedJournal.appendRollbackRecord(3, false);
- blockOnReplication(manager);
+ blockOnReplication(storage, manager);
assertEquals(0, manager.getActiveTokens().size());
@@ -294,7 +305,7 @@
manager.pageWrite(pgmsg, 3);
manager.pageWrite(pgmsg, 4);
- blockOnReplication(manager);
+ blockOnReplication(storage, manager);
PagingManager pagingManager = createPageManager(server.getStorageManager(),
server.getConfiguration(),
@@ -313,7 +324,7 @@
manager.pageDeleted(dummy, 5);
manager.pageDeleted(dummy, 6);
- blockOnReplication(manager);
+ blockOnReplication(storage, manager);
ServerMessageImpl serverMsg = new ServerMessageImpl();
serverMsg.setMessageID(500);
@@ -328,7 +339,7 @@
manager.largeMessageDelete(500);
- blockOnReplication(manager);
+ blockOnReplication(storage, manager);
store.start();
@@ -363,7 +374,9 @@
try
{
+ StorageManager storage = getStorage();
ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+ this.factory,
ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
manager.start();
@@ -377,7 +390,7 @@
}
final CountDownLatch latch = new CountDownLatch(1);
- OperationContextImpl.getInstance().executeOnCompletion(new IOAsyncTask()
+ storage.afterCompleteOperations(new IOAsyncTask()
{
public void onError(int errorCode, String errorMessage)
@@ -401,13 +414,21 @@
}
/**
+ * @return
+ */
+ private JournalStorageManager getStorage()
+ {
+ return new JournalStorageManager(createDefaultConfig(), factory);
+ }
+
+ /**
* @param manager
* @return
*/
- private void blockOnReplication(ReplicationManagerImpl manager) throws Exception
+ private void blockOnReplication(StorageManager storage, ReplicationManagerImpl
manager) throws Exception
{
final CountDownLatch latch = new CountDownLatch(1);
- OperationContextImpl.getInstance().executeOnCompletion(new IOAsyncTask()
+ storage.afterCompleteOperations(new IOAsyncTask()
{
public void onError(int errorCode, String errorMessage)
@@ -430,6 +451,7 @@
try
{
ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+ this.factory,
ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
manager.start();
fail("Exception expected");
@@ -455,7 +477,9 @@
try
{
+ StorageManager storage = getStorage();
ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+ this.factory,
ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
manager.start();
@@ -464,7 +488,7 @@
replicatedJournal.appendPrepareRecord(1, new FakeData(), false);
final CountDownLatch latch = new CountDownLatch(1);
- OperationContextImpl.getInstance().executeOnCompletion(new IOAsyncTask()
+ storage.afterCompleteOperations(new IOAsyncTask()
{
public void onError(int errorCode, String errorMessage)
@@ -505,7 +529,9 @@
try
{
+ StorageManager storage = getStorage();
ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
+ this.factory,
ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
manager.start();
@@ -515,7 +541,7 @@
final CountDownLatch latch = new CountDownLatch(numberOfAdds);
- OperationContext ctx = OperationContextImpl.getInstance();
+ OperationContext ctx = storage.getContext();
for (int i = 0; i < numberOfAdds; i++)
{
@@ -593,9 +619,26 @@
executor = Executors.newCachedThreadPool(tFactory);
scheduledExecutor = new ScheduledThreadPoolExecutor(10, tFactory);
+
+ factory = new OrderedExecutorFactory(executor);
+ }
+ protected void tearDown() throws Exception
+ {
+
+ executor.shutdown();
+
+ scheduledExecutor.shutdown();
+
+ tFactory = null;
+
+ scheduledExecutor = null;
+
+ super.tearDown();
+
}
+
private FailoverManagerImpl createFailoverManager()
{
return createFailoverManager(null);
@@ -622,22 +665,6 @@
scheduledExecutor,
interceptors);
}
-
- protected void tearDown() throws Exception
- {
-
- executor.shutdown();
-
- scheduledExecutor.shutdown();
-
- tFactory = null;
-
- scheduledExecutor = null;
-
- super.tearDown();
-
- }
-
protected PagingManager createPageManager(StorageManager storageManager,
Configuration configuration,
ExecutorFactory executorFactory,
Modified:
branches/ClebertCallback/tests/src/org/hornetq/tests/performance/persistence/StorageManagerTimingTest.java
===================================================================
---
branches/ClebertCallback/tests/src/org/hornetq/tests/performance/persistence/StorageManagerTimingTest.java 2009-11-22
18:49:03 UTC (rev 8368)
+++
branches/ClebertCallback/tests/src/org/hornetq/tests/performance/persistence/StorageManagerTimingTest.java 2009-11-23
02:04:55 UTC (rev 8369)
@@ -15,6 +15,8 @@
import java.io.File;
import java.util.HashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
@@ -29,6 +31,7 @@
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.SimpleString;
/**
@@ -110,6 +113,17 @@
final int transInterval,
final int numberOfThreads) throws Exception
{
+
+ final ExecutorService executor = Executors.newCachedThreadPool();
+
+ ExecutorFactory factory = new ExecutorFactory()
+ {
+ public Executor getExecutor()
+ {
+ return executor;
+ }
+ };
+
FileConfiguration configuration = new FileConfiguration();
configuration.start();
@@ -122,7 +136,7 @@
PostOffice postOffice = new FakePostOffice();
final JournalStorageManager journal = new JournalStorageManager(configuration,
-
Executors.newCachedThreadPool());
+ factory);
journal.start();
HashMap<Long, Queue> queues = new HashMap<Long, Queue>();
@@ -187,6 +201,7 @@
if (transInterval > 0 && i % transInterval == 0)
{
journal.commit(trans);
+ journal.waitOnOperations();
commits++;
trans = transactionGenerator.incrementAndGet();
commitPending = false;
@@ -194,7 +209,10 @@
}
if (commitPending)
+ {
journal.commit(trans);
+ journal.waitOnOperations();
+ }
long end = System.currentTimeMillis();
Modified:
branches/ClebertCallback/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java
===================================================================
---
branches/ClebertCallback/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java 2009-11-22
18:49:03 UTC (rev 8368)
+++
branches/ClebertCallback/tests/src/org/hornetq/tests/timing/core/server/impl/QueueImplTest.java 2009-11-23
02:04:55 UTC (rev 8369)
@@ -15,8 +15,6 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -44,22 +42,18 @@
private ScheduledExecutorService scheduledExecutor;
- private ExecutorService executor;
+ //private ExecutorService executor;
public void setUp() throws Exception
{
super.setUp();
scheduledExecutor = new ScheduledThreadPoolExecutor(1);
-
- executor = Executors.newSingleThreadExecutor();
}
public void tearDown() throws Exception
{
scheduledExecutor.shutdownNow();
-
- executor.shutdown();
super.tearDown();
}
@@ -78,7 +72,7 @@
public void testScheduledNoConsumer() throws Exception
{
- Queue queue = new QueueImpl(1, new SimpleString("address1"), new
SimpleString("queue1"), null, false, true, executor, scheduledExecutor, null,
null, null);
+ Queue queue = new QueueImpl(1, new SimpleString("address1"), new
SimpleString("queue1"), null, false, true, scheduledExecutor, null, null,
null);
//Send one scheduled
@@ -144,7 +138,7 @@
private void testScheduled(boolean direct) throws Exception
{
- Queue queue = new QueueImpl(1,new SimpleString("address1"), new
SimpleString("queue1"), null, false, true, executor, scheduledExecutor, null,
null, null);
+ Queue queue = new QueueImpl(1,new SimpleString("address1"), new
SimpleString("queue1"), null, false, true, scheduledExecutor, null, null,
null);
FakeConsumer consumer = null;
@@ -251,7 +245,7 @@
return HandleStatus.HANDLED;
}
};
- Queue queue = new QueueImpl(1, new SimpleString("address1"), queue1,
null, false, true, executor, scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, new SimpleString("address1"), queue1,
null, false, true, scheduledExecutor, null, null, null);
MessageReference messageReference = generateReference(queue, 1);
queue.addConsumer(consumer);
messageReference.setScheduledDeliveryTime(System.currentTimeMillis() + 2000);
Modified:
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
---
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-11-22
18:49:03 UTC (rev 8368)
+++
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-11-23
02:04:55 UTC (rev 8369)
@@ -1264,6 +1264,20 @@
return null;
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#clearContext()
+ */
+ public void clearContext()
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.persistence.StorageManager#setContext(org.hornetq.core.persistence.OperationContext)
+ */
+ public void setContext(OperationContext context)
+ {
+ }
+
}
class FakeStoreFactory implements PagingStoreFactory
Modified:
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
===================================================================
---
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java 2009-11-22
18:49:03 UTC (rev 8368)
+++
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java 2009-11-23
02:04:55 UTC (rev 8369)
@@ -17,6 +17,7 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -38,6 +39,8 @@
import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.ServiceTestBase;
+import org.hornetq.utils.ExecutorFactory;
+import org.hornetq.utils.OrderedExecutorFactory;
import org.hornetq.utils.Pair;
import org.hornetq.utils.SimpleString;
@@ -60,6 +63,8 @@
ExecutorService executor;
+ ExecutorFactory factory;
+
@Override
protected void tearDown() throws Exception
{
@@ -71,6 +76,7 @@
{
super.setUp();
executor = Executors.newSingleThreadExecutor();
+ factory = new OrderedExecutorFactory(executor);
}
// Public --------------------------------------------------------
@@ -96,7 +102,7 @@
ScheduledExecutorService scheduledThreadPool =
Executors.newScheduledThreadPool(ConfigurationImpl.DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE);
- journal = new JournalStorageManager(configuration,
Executors.newCachedThreadPool());
+ journal = new JournalStorageManager(configuration, factory);
journal.start();
journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new
ArrayList<GroupingInfo>());
@@ -111,7 +117,7 @@
assertEquals(0, mapDups.size());
- DuplicateIDCacheImpl cacheID = new DuplicateIDCacheImpl(ADDRESS, 10, journal,
true, executor);
+ DuplicateIDCacheImpl cacheID = new DuplicateIDCacheImpl(ADDRESS, 10, journal,
true);
for (int i = 0; i < 100; i++)
{
@@ -120,7 +126,7 @@
journal.stop();
- journal = new JournalStorageManager(configuration,
Executors.newCachedThreadPool());
+ journal = new JournalStorageManager(configuration, factory);
journal.start();
journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new
ArrayList<GroupingInfo>());
@@ -136,7 +142,7 @@
assertEquals(10, values.size());
- cacheID = new DuplicateIDCacheImpl(ADDRESS, 10, journal, true, executor);
+ cacheID = new DuplicateIDCacheImpl(ADDRESS, 10, journal, true);
cacheID.load(values);
for (int i = 0; i < 100; i++)
@@ -148,7 +154,7 @@
mapDups.clear();
- journal = new JournalStorageManager(configuration,
Executors.newCachedThreadPool());
+ journal = new JournalStorageManager(configuration, factory);
journal.start();
journal.loadBindingJournal(new ArrayList<QueueBindingInfo>(), new
ArrayList<GroupingInfo>());
Modified:
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
---
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java 2009-11-22
18:49:03 UTC (rev 8368)
+++
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/server/impl/QueueImplTest.java 2009-11-23
02:04:55 UTC (rev 8369)
@@ -17,7 +17,6 @@
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -46,19 +45,15 @@
private ScheduledExecutorService scheduledExecutor;
- private ExecutorService executor;
-
protected void setUp() throws Exception
{
super.setUp();
scheduledExecutor = Executors.newSingleThreadScheduledExecutor();
- executor = Executors.newSingleThreadExecutor();
}
protected void tearDown() throws Exception
{
scheduledExecutor.shutdown();
- executor.shutdown();
super.tearDown();
}
@@ -70,18 +65,18 @@
{
final SimpleString name = new SimpleString("oobblle");
- Queue queue = new QueueImpl(1, address1, name, null, false, true, executor,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, name, null, false, true,
scheduledExecutor, null, null, null);
assertEquals(name, queue.getName());
}
public void testDurable()
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, false, executor,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, false,
scheduledExecutor, null, null, null);
assertFalse(queue.isDurable());
- queue = new QueueImpl(1, address1, queue1, null, true, false, executor,
scheduledExecutor, null, null, null);
+ queue = new QueueImpl(1, address1, queue1, null, true, false, scheduledExecutor,
null, null, null);
assertTrue(queue.isDurable());
}
@@ -94,7 +89,7 @@
Consumer cons3 = new FakeConsumer();
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
assertEquals(0, queue.getConsumerCount());
@@ -135,7 +130,7 @@
public void testGetFilter()
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
assertNull(queue.getFilter());
@@ -152,7 +147,7 @@
}
};
- queue = new QueueImpl(1, address1, queue1, filter, false, true, executor,
scheduledExecutor, null, null, null);
+ queue = new QueueImpl(1, address1, queue1, filter, false, true, scheduledExecutor,
null, null, null);
assertEquals(filter, queue.getFilter());
@@ -160,7 +155,7 @@
public void testSimpleadd()
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
final int numMessages = 10;
@@ -179,7 +174,7 @@
public void testSimpleDirectDelivery() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
FakeConsumer consumer = new FakeConsumer();
@@ -207,7 +202,7 @@
public void testSimpleNonDirectDelivery() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
final int numMessages = 10;
@@ -245,7 +240,7 @@
public void testBusyConsumer() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
FakeConsumer consumer = new FakeConsumer();
@@ -289,7 +284,7 @@
public void testBusyConsumerThenAddMoreMessages() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
FakeConsumer consumer = new FakeConsumer();
@@ -356,7 +351,7 @@
public void testAddFirstadd() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
final int numMessages = 10;
@@ -416,7 +411,7 @@
null,
false,
true,
- executor, scheduledExecutor,
+ scheduledExecutor,
new FakePostOffice(),
null,
null);
@@ -573,7 +568,7 @@
public void testConsumerReturningNull() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
class NullConsumer implements Consumer
{
@@ -606,7 +601,7 @@
public void testRoundRobinWithQueueing() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
final int numMessages = 10;
@@ -649,7 +644,7 @@
public void testRoundRobinDirect() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
final int numMessages = 10;
@@ -690,7 +685,7 @@
public void testWithPriorities() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
final int numMessages = 10;
@@ -757,7 +752,7 @@
public void testConsumerWithFilterAddAndRemove()
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
Filter filter = new FakeFilter("fruit", "orange");
@@ -766,7 +761,7 @@
public void testList()
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
final int numMessages = 20;
@@ -790,7 +785,7 @@
public void testListWithFilter()
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
final int numMessages = 20;
@@ -832,7 +827,7 @@
null,
false,
true,
- executor, scheduledExecutor,
+ scheduledExecutor,
new FakePostOffice(),
null,
null);
@@ -904,7 +899,7 @@
public void testBusyConsumerWithFilterFirstCallBusy() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
FakeConsumer consumer = new FakeConsumer(FilterImpl.createFilter("color =
'green'"));
@@ -945,7 +940,7 @@
public void testBusyConsumerWithFilterThenAddMoreMessages() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
FakeConsumer consumer = new FakeConsumer(FilterImpl.createFilter("color =
'green'"));
@@ -1019,7 +1014,7 @@
public void testConsumerWithFilterThenAddMoreMessages() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
final int numMessages = 10;
List<MessageReference> refs = new ArrayList<MessageReference>();
@@ -1089,7 +1084,7 @@
null,
false,
true,
- executor, scheduledExecutor,
+ scheduledExecutor,
new FakePostOffice(),
null,
null);
@@ -1181,7 +1176,7 @@
public void testMessageOrder() throws Exception
{
FakeConsumer consumer = new FakeConsumer();
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
MessageReference messageReference = generateReference(queue, 1);
MessageReference messageReference2 = generateReference(queue, 2);
MessageReference messageReference3 = generateReference(queue, 3);
@@ -1201,7 +1196,7 @@
public void testMessagesAdded() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
MessageReference messageReference = generateReference(queue, 1);
MessageReference messageReference2 = generateReference(queue, 2);
MessageReference messageReference3 = generateReference(queue, 3);
@@ -1213,7 +1208,7 @@
public void testGetReference() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
MessageReference messageReference = generateReference(queue, 1);
MessageReference messageReference2 = generateReference(queue, 2);
MessageReference messageReference3 = generateReference(queue, 3);
@@ -1226,7 +1221,7 @@
public void testGetNonExistentReference() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
MessageReference messageReference = generateReference(queue, 1);
MessageReference messageReference2 = generateReference(queue, 2);
MessageReference messageReference3 = generateReference(queue, 3);
@@ -1243,7 +1238,7 @@
*/
public void testPauseAndResumeWithAsync() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
// pauses the queue
queue.pause();
@@ -1298,7 +1293,7 @@
public void testPauseAndResumeWithDirect() throws Exception
{
- Queue queue = new QueueImpl(1, address1, queue1, null, false, true, executor,
scheduledExecutor, null, null, null);
+ Queue queue = new QueueImpl(1, address1, queue1, null, false, true,
scheduledExecutor, null, null, null);
// Now add a consumer
FakeConsumer consumer = new FakeConsumer();
Modified:
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeQueueFactory.java
===================================================================
---
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeQueueFactory.java 2009-11-22
18:49:03 UTC (rev 8368)
+++
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakeQueueFactory.java 2009-11-23
02:04:55 UTC (rev 8369)
@@ -35,14 +35,12 @@
{
private final ScheduledExecutorService scheduledExecutor =
Executors.newSingleThreadScheduledExecutor();
- private final ExecutorService executor = Executors.newSingleThreadExecutor();
-
private PostOffice postOffice;
public Queue createQueue(long persistenceID, final SimpleString address, SimpleString
name, Filter filter,
boolean durable, boolean temporary)
{
- return new QueueImpl(persistenceID, address, name, filter, durable, temporary,
executor, scheduledExecutor, postOffice, null, null);
+ return new QueueImpl(persistenceID, address, name, filter, durable, temporary,
scheduledExecutor, postOffice, null, null);
}
public void setPostOffice(PostOffice postOffice)
@@ -54,8 +52,6 @@
public void stop() throws Exception
{
scheduledExecutor.shutdown();
-
- executor.shutdown();
}
}
Modified: branches/ClebertCallback/tests/src/org/hornetq/tests/util/UnitTestCase.java
===================================================================
--- branches/ClebertCallback/tests/src/org/hornetq/tests/util/UnitTestCase.java 2009-11-22
18:49:03 UTC (rev 8368)
+++ branches/ClebertCallback/tests/src/org/hornetq/tests/util/UnitTestCase.java 2009-11-23
02:04:55 UTC (rev 8369)
@@ -50,6 +50,8 @@
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
import org.hornetq.core.postoffice.PostOffice;
@@ -647,6 +649,8 @@
@Override
protected void tearDown() throws Exception
{
+ OperationContextImpl.clearContext();
+
deleteDirectory(new File(getTestDir()));
assertEquals(0, InVMRegistry.instance.size());