Author: clebert.suconic(a)jboss.com
Date: 2009-11-21 18:08:15 -0500 (Sat, 21 Nov 2009)
New Revision: 8365
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java
branches/ClebertCallback/src/main/org/hornetq/core/persistence/OperationContext.java
branches/ClebertCallback/src/main/org/hornetq/core/persistence/StorageManager.java
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/SyncOperation.java
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/ClebertCallback/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/ClebertCallback/src/main/org/hornetq/core/replication/ReplicationManager.java
branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/client/OrderTest.java
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
Simplifying ordering and switch context
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java 2009-11-21
20:49:38 UTC (rev 8364)
+++
branches/ClebertCallback/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java 2009-11-21
23:08:15 UTC (rev 8365)
@@ -744,7 +744,7 @@
// TODO: Talk to Andy and Jeff about a better way to sync this...
System.out.println("Waiting");
final CountDownLatch latch = new CountDownLatch(1);
- OperationContextImpl.getContext().executeOnCompletion(new IOAsyncTask()
+ OperationContextImpl.getInstance().executeOnCompletion(new IOAsyncTask()
{
public void done()
@@ -759,7 +759,7 @@
});
- OperationContextImpl.getContext().complete();
+ OperationContextImpl.getInstance().complete();
latch.await(5, TimeUnit.SECONDS);
System.out.println("Done");
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/persistence/OperationContext.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/persistence/OperationContext.java 2009-11-21
20:49:38 UTC (rev 8364)
+++
branches/ClebertCallback/src/main/org/hornetq/core/persistence/OperationContext.java 2009-11-21
23:08:15 UTC (rev 8365)
@@ -13,6 +13,8 @@
package org.hornetq.core.persistence;
+import java.util.concurrent.Executor;
+
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.IOCompletion;
@@ -29,7 +31,16 @@
{
boolean hasReplication();
+
+ /** Reattach the context to the current thread */
+ void reinstall();
+
+ /** The executor used on responses.
+ * If this is not set, it will use the current thread. */
+ void setExecutor(Executor executor);
+ /** Execute the task when all IO operations are complete,
+ * Or execute it immediately if nothing is pending. */
void executeOnCompletion(IOAsyncTask runnable);
void replicationLineUp();
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/persistence/StorageManager.java 2009-11-21
20:49:38 UTC (rev 8364)
+++
branches/ClebertCallback/src/main/org/hornetq/core/persistence/StorageManager.java 2009-11-21
23:08:15 UTC (rev 8365)
@@ -15,6 +15,7 @@
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Executor;
import javax.transaction.xa.Xid;
@@ -47,6 +48,13 @@
*/
public interface StorageManager extends HornetQComponent
{
+
+ /** Get the context associated with the thread for later reuse */
+ OperationContext getContext();
+
+ /** It just creates an OperationContext without associating it to any threads */
+ OperationContext newContext(Executor executor);
+
// Message related operations
void pageClosed(SimpleString storeName, int pageNumber);
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-21
20:49:38 UTC (rev 8364)
+++
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-21
23:08:15 UTC (rev 8365)
@@ -55,6 +55,7 @@
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
import org.hornetq.core.persistence.GroupingInfo;
+import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.QueueBindingInfo;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.Binding;
@@ -294,14 +295,7 @@
*/
public void completeOperations()
{
- if (replicator != null)
- {
- replicator.closeContext();
- }
- else
- {
- OperationContextImpl.getContext().complete();
- }
+ OperationContextImpl.getInstance().complete();
}
public boolean isReplicated()
@@ -373,9 +367,26 @@
// TODO: shouldn't those page methods be on the PageManager? ^^^^
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#getContext()
+ */
+ public OperationContext getContext()
+ {
+ return OperationContextImpl.getInstance();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#newContext()
+ */
+ public OperationContext newContext(Executor executor)
+ {
+ return new OperationContextImpl(executor);
+ }
+
public void afterCompleteOperations(IOAsyncTask run)
{
- OperationContextImpl.getContext().executeOnCompletion(run);
+ OperationContextImpl.getInstance().executeOnCompletion(run);
}
public UUID getPersistentID()
@@ -506,7 +517,7 @@
public void sync()
{
- messageJournal.sync(OperationContextImpl.getContext());
+ messageJournal.sync(OperationContextImpl.getInstance());
}
// Transactional operations
@@ -1390,7 +1401,7 @@
private IOCompletion getIOContext()
{
- return OperationContextImpl.getContext();
+ return OperationContextImpl.getInstance();
}
private void checkAndCreateDir(final String dir, final boolean create)
@@ -1917,7 +1928,6 @@
}
}
-
}
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2009-11-21
20:49:38 UTC (rev 8364)
+++
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/OperationContextImpl.java 2009-11-21
23:08:15 UTC (rev 8365)
@@ -16,21 +16,32 @@
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.persistence.OperationContext;
+import sun.security.util.PendingException;
+
/**
- * A ReplicationToken
- *
+ *
+ * This class will hold operations when there are IO operations...
+ * and it will
+ *
* @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
*/
public class OperationContextImpl implements OperationContext
{
private static final ThreadLocal<OperationContext> tlContext = new
ThreadLocal<OperationContext>();
- public static OperationContext getContext()
+ public static void setInstance(OperationContext context)
{
+ tlContext.set(context);
+ }
+
+ public static OperationContext getInstance()
+ {
OperationContext token = tlContext.get();
if (token == null)
{
@@ -53,6 +64,14 @@
private int stored = 0;
private int replicated = 0;
+
+ private int errorCode = -1;
+
+ private String errorMessage = null;
+
+ private Executor executor;
+
+ private final AtomicInteger executorsPending = new AtomicInteger(0);
/**
* @param executor
@@ -62,6 +81,21 @@
super();
}
+ public OperationContextImpl(final Executor executor)
+ {
+ super();
+ this.executor = executor;
+ }
+
+ /*
+ * @see org.hornetq.core.persistence.OperationContext#reinstall()
+ */
+ public void reinstall()
+ {
+ setInstance(this);
+ }
+
+
/** To be called by the replication manager, when new replication is added to the
queue */
public void lineUp()
{
@@ -72,6 +106,12 @@
{
replicationLineUp++;
}
+
+ /** this method needs to be called before the executor became operational */
+ public void setExecutor(Executor executor)
+ {
+ this.executor = executor;
+ }
public synchronized void replicationDone()
{
@@ -85,7 +125,7 @@
}
/** You may have several actions to be done after a replication operation is
completed. */
- public synchronized void executeOnCompletion(IOAsyncTask completion)
+ public synchronized void executeOnCompletion(final IOAsyncTask completion)
{
if (tasks == null)
{
@@ -94,9 +134,30 @@
minimalStore = storeLineUp;
}
+ // On this case, we can just execute the context directly
if (replicationLineUp == replicated && storeLineUp == stored)
{
- completion.done();
+ if (executor != null)
+ {
+ // We want to avoid the executor if everything is complete...
+ // However, we can't execute the context if there are executions pending
+ // We need to use the executor on this case
+ if (executorsPending.get() == 0)
+ {
+ // No need to use an executor here or a context switch
+ // there are no actions pending.. hence we can just execute the task
directly on the same thread
+ completion.done();
+ }
+ else
+ {
+ execute(completion);
+ }
+ }
+ else
+ {
+ // Execute without an executor
+ completion.done();
+ }
}
else
{
@@ -122,19 +183,64 @@
if (!holder.executed && stored >= holder.storeLined &&
replicated >= holder.replicationLined)
{
holder.executed = true;
- holder.task.done();
+
+ if (executor != null)
+ {
+ // If set, we use an executor to avoid the server being single
threaded
+ execute(holder.task);
+ }
+ else
+ {
+ holder.task.done();
+ }
+
iter.remove();
}
+ else
+ {
+ // The actions need to be done in order...
+ // so it must achieve both conditions before we can proceed to more tasks
+ break;
+ }
}
}
}
+ /**
+ * @param holder
+ */
+ private void execute(final IOAsyncTask task)
+ {
+ executorsPending.incrementAndGet();
+ executor.execute(new Runnable()
+ {
+ public void run()
+ {
+ task.done();
+ executorsPending.decrementAndGet();
+ }
+ });
+ }
+
/* (non-Javadoc)
* @see org.hornetq.core.replication.ReplicationToken#complete()
*/
public void complete()
{
tlContext.set(null);
+
+ // TODO: test and fix exceptions on the Context
+ if (tasks != null && errorMessage != null)
+ {
+ for (TaskHolder run : tasks)
+ {
+ run.task.onError(errorCode, errorMessage);
+ }
+ }
+
+ // We hold errors until the complete is set, or the callbacks will never get
informed
+ errorCode = -1;
+ errorMessage = null;
}
public boolean isSync()
@@ -147,13 +253,8 @@
*/
public void onError(int errorCode, String errorMessage)
{
- if (tasks != null)
- {
- for (TaskHolder run : tasks)
- {
- run.task.onError(errorCode, errorMessage);
- }
- }
+ this.errorCode = errorCode;
+ this.errorMessage = errorMessage;
}
class TaskHolder
@@ -174,4 +275,5 @@
}
}
+
}
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/SyncOperation.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/SyncOperation.java 2009-11-21
20:49:38 UTC (rev 8364)
+++
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/journal/SyncOperation.java 2009-11-21
23:08:15 UTC (rev 8365)
@@ -13,6 +13,8 @@
package org.hornetq.core.persistence.impl.journal;
+import java.util.concurrent.Executor;
+
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.persistence.OperationContext;
@@ -125,6 +127,22 @@
ctx.replicationLineUp();
}
+ /**
+ * @see
org.hornetq.core.persistence.OperationContext#setExecutor(java.util.concurrent.Executor)
+ */
+ public void setExecutor(Executor executor)
+ {
+ ctx.setExecutor(executor);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.OperationContext#reattach()
+ */
+ public void reinstall()
+ {
+ OperationContextImpl.setInstance(this);
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-11-21
20:49:38 UTC (rev 8364)
+++
branches/ClebertCallback/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-11-21
23:08:15 UTC (rev 8365)
@@ -15,6 +15,7 @@
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong;
import javax.transaction.xa.Xid;
@@ -26,6 +27,7 @@
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.persistence.GroupingInfo;
+import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.QueueBindingInfo;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.Binding;
@@ -356,4 +358,20 @@
{
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#getContext()
+ */
+ public OperationContext getContext()
+ {
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#newContext()
+ */
+ public OperationContext newContext(Executor executor)
+ {
+ return null;
+ }
+
}
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-21
20:49:38 UTC (rev 8364)
+++
branches/ClebertCallback/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-21
23:08:15 UTC (rev 8365)
@@ -917,10 +917,6 @@
}
}
}
- else
- {
- storageManager.sync();
- }
message.incrementRefCount(reference);
}
@@ -931,6 +927,8 @@
}
else
{
+ // This will use the same thread if there are no pending operations
+ // avoiding a context switch on this case
storageManager.afterCompleteOperations(new IOAsyncTask()
{
public void onError(int errorCode, String errorMessage)
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-11-21
20:49:38 UTC (rev 8364)
+++
branches/ClebertCallback/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-11-21
23:08:15 UTC (rev 8365)
@@ -50,8 +50,6 @@
void appendRollbackRecord(byte journalID, long txID) throws Exception;
- void closeContext();
-
/** A list of tokens that are still waiting for replications to be completed */
Set<OperationContext> getActiveTokens();
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-11-21
20:49:38 UTC (rev 8364)
+++
branches/ClebertCallback/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-11-21
23:08:15 UTC (rev 8365)
@@ -392,25 +392,7 @@
started = false;
}
- /* (non-Javadoc)
- * @see org.hornetq.core.replication.ReplicationManager#completeToken()
- */
- public void closeContext()
- {
- final OperationContext token = getContext();
- if (token != null)
- {
- // Remove from pending tokens as soon as this is complete
- if (!token.hasReplication())
- {
- sync(token);
- }
- token.complete();
- }
- }
-
-
/* method for testcases only
* @see org.hornetq.core.replication.ReplicationManager#getPendingTokens()
*/
@@ -513,7 +495,7 @@
public OperationContext getContext()
{
- return OperationContextImpl.getContext();
+ return OperationContextImpl.getInstance();
}
/**
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-11-21
20:49:38 UTC (rev 8364)
+++
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-11-21
23:08:15 UTC (rev 8365)
@@ -22,6 +22,7 @@
import java.util.Set;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -61,6 +62,7 @@
import org.hornetq.core.paging.impl.PagingManagerImpl;
import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
import org.hornetq.core.persistence.GroupingInfo;
+import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.QueueBindingInfo;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
@@ -646,6 +648,10 @@
}
Channel channel = connection.getChannel(channelID, sendWindowSize);
+
+ Executor sessionExecutor = executorFactory.getExecutor();
+
+ OperationContext sessionContext = storageManager.newContext(sessionExecutor);
final ServerSessionImpl session = new ServerSessionImpl(name,
username,
@@ -661,7 +667,8 @@
postOffice,
resourceManager,
securityStore,
-
executorFactory.getExecutor(),
+ sessionContext,
+ sessionExecutor,
channel,
managementService,
// queueFactory,
@@ -670,7 +677,7 @@
sessions.put(name, session);
- ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session);
+ ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session,
sessionContext);
session.setHandler(handler);
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-21
20:49:38 UTC (rev 8364)
+++
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-11-21
23:08:15 UTC (rev 8365)
@@ -39,6 +39,7 @@
import org.hornetq.core.management.ManagementService;
import org.hornetq.core.management.Notification;
import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.postoffice.Binding;
@@ -187,6 +188,10 @@
private final HornetQServer server;
private final SimpleString managementAddress;
+
+ /** We always use the same operation context for the session.
+ * With that we can perform extra checks */
+ private final OperationContext sessionOperationContext;
// The current currentLargeMessage being processed
private volatile LargeServerMessage currentLargeMessage;
@@ -213,6 +218,7 @@
final PostOffice postOffice,
final ResourceManager resourceManager,
final SecurityStore securityStore,
+ final OperationContext sessionOperationContext,
final Executor executor,
final Channel channel,
final ManagementService managementService,
@@ -242,6 +248,8 @@
this.resourceManager = resourceManager;
this.securityStore = securityStore;
+
+ this.sessionOperationContext = sessionOperationContext;
this.executor = executor;
@@ -1739,8 +1747,6 @@
doSendResponse(confirmPacket, response, flush, closeChannel);
}
});
-
- storageManager.completeOperations();
}
/**
Modified:
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java
===================================================================
---
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java 2009-11-21
20:49:38 UTC (rev 8364)
+++
branches/ClebertCallback/src/main/org/hornetq/core/server/impl/ServerSessionPacketHandler.java 2009-11-21
23:08:15 UTC (rev 8365)
@@ -45,6 +45,7 @@
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.SESS_XA_SUSPEND;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.remoting.ChannelHandler;
import org.hornetq.core.remoting.Packet;
import org.hornetq.core.remoting.impl.wireformat.CreateQueueMessage;
@@ -80,16 +81,20 @@
* @author <a href="mailto:jmesnil@redhat.com">Jeff Mesnil</a>
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @author <a href="mailto:andy.taylor@jboss.org>Andy Taylor</a>
+ * @author <a href="mailto:clebert.suconic@jboss.org>Clebert
Suconic</a>
*/
public class ServerSessionPacketHandler implements ChannelHandler
{
private static final Logger log = Logger.getLogger(ServerSessionPacketHandler.class);
private final ServerSession session;
+
+ private final OperationContext sessionContext;
- public ServerSessionPacketHandler(final ServerSession session)
+ public ServerSessionPacketHandler(final ServerSession session, OperationContext
sessionContext)
{
this.session = session;
+ this.sessionContext = sessionContext;
}
public long getID()
@@ -101,6 +106,11 @@
{
byte type = packet.getType();
+ if (sessionContext != null)
+ {
+ sessionContext.reinstall();
+ }
+
try
{
switch (type)
@@ -289,5 +299,12 @@
{
log.error("Caught unexpected exception", t);
}
+ finally
+ {
+ if (sessionContext != null)
+ {
+ sessionContext.complete();
+ }
+ }
}
}
Modified:
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/client/OrderTest.java
===================================================================
---
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/client/OrderTest.java 2009-11-21
20:49:38 UTC (rev 8364)
+++
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/client/OrderTest.java 2009-11-21
23:08:15 UTC (rev 8365)
@@ -13,18 +13,11 @@
package org.hornetq.tests.integration.client;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
import org.hornetq.core.client.ClientConsumer;
import org.hornetq.core.client.ClientMessage;
import org.hornetq.core.client.ClientProducer;
import org.hornetq.core.client.ClientSession;
import org.hornetq.core.client.ClientSessionFactory;
-import org.hornetq.core.client.SessionFailureListener;
-import org.hornetq.core.client.impl.ClientSessionInternal;
-import org.hornetq.core.exception.HornetQException;
-import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.tests.util.ServiceTestBase;
@@ -47,9 +40,6 @@
protected void setUp() throws Exception
{
super.setUp();
- server = createServer(true, true);
- server.getConfiguration().setJournalFileSize(10 * 1024 * 1024);
- server.start();
}
protected void tearDown() throws Exception
@@ -64,8 +54,21 @@
// Public --------------------------------------------------------
- public void testSimpleOrder() throws Exception
+ public void testSimpleOrderNoStorage() throws Exception
{
+ doTestSimpleOrder(false);
+ }
+
+ public void testSimpleOrderPersistence() throws Exception
+ {
+ doTestSimpleOrder(true);
+ }
+
+ public void doTestSimpleOrder(final boolean persistent) throws Exception
+ {
+ server = createServer(persistent, true);
+ server.start();
+
ClientSessionFactory sf = createNettyFactory();
sf.setBlockOnNonPersistentSend(false);
@@ -92,16 +95,16 @@
boolean started = false;
- for (int start = 0; start < 3; start++)
+ for (int start = 0; start < 2; start++)
{
- if (start == 2)
+ if (persistent && start == 1)
{
started = true;
server.stop();
server.start();
}
-
+
session = sf.createSession(true, true);
session.start();
@@ -142,40 +145,6 @@
}
- private void fail(ClientSession session) throws InterruptedException
- {
-
- final CountDownLatch latch = new CountDownLatch(1);
-
- class MyListener implements SessionFailureListener
- {
- public void connectionFailed(HornetQException me)
- {
- latch.countDown();
- }
-
- public void beforeReconnect(HornetQException exception)
- {
- }
- }
-
- MyListener listener = new MyListener();
- session.addFailureListener(listener);
-
- RemotingConnection conn = ((ClientSessionInternal)session).getConnection();
-
- // Simulate failure on connection
- conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
-
- // Wait to be informed of failure
-
- boolean ok = latch.await(1000, TimeUnit.MILLISECONDS);
-
- assertTrue(ok);
-
- session.removeFailureListener(listener);
- }
-
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified:
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java
===================================================================
---
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java 2009-11-21
20:49:38 UTC (rev 8364)
+++
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationOrderTest.java 2009-11-21
23:08:15 UTC (rev 8365)
@@ -52,17 +52,6 @@
// Public --------------------------------------------------------
- public void test() throws Exception
- {
- for (int i = 0; i < 100; i++)
- {
- System.out.println("<<<<<< " + i + "
>>>>>>>");
- testMixedPersistentAndNonPersistentMessagesOrderWithReplicatedBackup();
- tearDown();
- setUp();
- }
- }
-
public void testMixedPersistentAndNonPersistentMessagesOrderWithReplicatedBackup()
throws Exception
{
doTestMixedPersistentAndNonPersistentMessagesOrderWithReplicatedBackup(false);
Modified:
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
---
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-11-21
20:49:38 UTC (rev 8364)
+++
branches/ClebertCallback/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-11-21
23:08:15 UTC (rev 8365)
@@ -51,6 +51,7 @@
import org.hornetq.core.paging.impl.PagedMessageImpl;
import org.hornetq.core.paging.impl.PagingManagerImpl;
import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
+import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.remoting.Interceptor;
@@ -368,7 +369,6 @@
Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(),
manager);
- Thread.sleep(100);
TestInterceptor.value.set(false);
for (int i = 0; i < 500; i++)
@@ -377,7 +377,7 @@
}
final CountDownLatch latch = new CountDownLatch(1);
- OperationContextImpl.getContext().executeOnCompletion(new IOAsyncTask()
+ OperationContextImpl.getInstance().executeOnCompletion(new IOAsyncTask()
{
public void onError(int errorCode, String errorMessage)
@@ -390,8 +390,6 @@
}
});
- manager.closeContext();
-
server.stop();
assertTrue(latch.await(50, TimeUnit.SECONDS));
@@ -409,7 +407,7 @@
private void blockOnReplication(ReplicationManagerImpl manager) throws Exception
{
final CountDownLatch latch = new CountDownLatch(1);
- OperationContextImpl.getContext().executeOnCompletion(new IOAsyncTask()
+ OperationContextImpl.getInstance().executeOnCompletion(new IOAsyncTask()
{
public void onError(int errorCode, String errorMessage)
@@ -422,8 +420,6 @@
}
});
- manager.closeContext();
-
assertTrue(latch.await(30, TimeUnit.SECONDS));
}
@@ -468,7 +464,7 @@
replicatedJournal.appendPrepareRecord(1, new FakeData(), false);
final CountDownLatch latch = new CountDownLatch(1);
- OperationContextImpl.getContext().executeOnCompletion(new IOAsyncTask()
+ OperationContextImpl.getInstance().executeOnCompletion(new IOAsyncTask()
{
public void onError(int errorCode, String errorMessage)
@@ -481,8 +477,6 @@
}
});
- manager.closeContext();
-
assertTrue(latch.await(1, TimeUnit.SECONDS));
assertEquals(0, manager.getActiveTokens().size());
@@ -521,6 +515,8 @@
final CountDownLatch latch = new CountDownLatch(numberOfAdds);
+ OperationContext ctx = OperationContextImpl.getInstance();
+
for (int i = 0; i < numberOfAdds; i++)
{
final int nAdd = i;
@@ -529,12 +525,8 @@
{
replicatedJournal.appendPrepareRecord(i, new FakeData(), false);
}
- else
- {
- manager.sync(OperationContextImpl.getContext());
- }
- OperationContextImpl.getContext().executeOnCompletion(new IOAsyncTask()
+ ctx.executeOnCompletion(new IOAsyncTask()
{
public void onError(int errorCode, String errorMessage)
@@ -543,12 +535,11 @@
public void done()
{
+ System.out.println("Add " + nAdd);
executions.add(nAdd);
latch.countDown();
}
});
-
- manager.closeContext();
}
assertTrue(latch.await(10, TimeUnit.SECONDS));
Modified:
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
---
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-11-21
20:49:38 UTC (rev 8364)
+++
branches/ClebertCallback/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-11-21
23:08:15 UTC (rev 8365)
@@ -44,6 +44,7 @@
import org.hornetq.core.paging.impl.PagingStoreImpl;
import org.hornetq.core.paging.impl.TestSupportPageStore;
import org.hornetq.core.persistence.GroupingInfo;
+import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.QueueBindingInfo;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.Binding;
@@ -1247,6 +1248,22 @@
{
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#getContext()
+ */
+ public OperationContext getContext()
+ {
+ return null;
+ }
+
+ /* (non-Javadoc)
+ * @see
org.hornetq.core.persistence.StorageManager#newContext(java.util.concurrent.Executor)
+ */
+ public OperationContext newContext(Executor executor)
+ {
+ return null;
+ }
+
}
class FakeStoreFactory implements PagingStoreFactory