[jboss-cvs] JBoss Messaging SVN: r5248 - in trunk: src/main/org/jboss/messaging/core/postoffice/impl and 6 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Nov 3 12:33:11 EST 2008
Author: timfox
Date: 2008-11-03 12:33:11 -0500 (Mon, 03 Nov 2008)
New Revision: 5248
Modified:
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/server/SendLock.java
trunk/src/main/org/jboss/messaging/core/server/impl/SendLockImpl.java
trunk/tests/bin/runtest
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/MultiThreadRandomFailoverTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java
Log:
More fixing tests, failover etc
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-11-03 17:15:30 UTC (rev 5247)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-11-03 17:33:11 UTC (rev 5248)
@@ -126,7 +126,7 @@
private final Executor executor;
private volatile RemotingConnection remotingConnection;
-
+
private volatile RemotingConnection backupConnection;
private final Map<Long, ClientProducerInternal> producers = new ConcurrentHashMap<Long, ClientProducerInternal>();
@@ -157,7 +157,7 @@
private final IDGenerator idGenerator = new SimpleIDGenerator(0);
private volatile boolean started;
-
+
// Constructors ----------------------------------------------------------------------------
public ClientSessionImpl(final ClientSessionFactoryInternal sessionFactory,
@@ -179,7 +179,7 @@
this.name = name;
this.remotingConnection = remotingConnection;
-
+
this.backupConnection = backupConnection;
this.connectionFactory = connectionFactory;
@@ -209,7 +209,7 @@
this.channel = channel;
- this.version = version;
+ this.version = version;
}
// ClientSession implementation
@@ -220,7 +220,7 @@
final SimpleString filterString,
final boolean durable,
final boolean temp) throws MessagingException
- {
+ {
checkClosed();
SessionCreateQueueMessage request = new SessionCreateQueueMessage(address, queueName, filterString, durable, temp);
@@ -229,14 +229,14 @@
}
public void deleteQueue(final SimpleString queueName) throws MessagingException
- {
+ {
checkClosed();
channel.sendBlocking(new SessionDeleteQueueMessage(queueName));
}
public SessionQueueQueryResponseMessage queueQuery(final SimpleString queueName) throws MessagingException
- {
+ {
checkClosed();
SessionQueueQueryMessage request = new SessionQueueQueryMessage(queueName);
@@ -247,7 +247,7 @@
}
public SessionBindingQueryResponseMessage bindingQuery(final SimpleString address) throws MessagingException
- {
+ {
checkClosed();
SessionBindingQueryMessage request = new SessionBindingQueryMessage(address);
@@ -258,7 +258,7 @@
}
public void addDestination(final SimpleString address, final boolean durable, final boolean temp) throws MessagingException
- {
+ {
checkClosed();
SessionAddDestinationMessage request = new SessionAddDestinationMessage(address, durable, temp);
@@ -267,7 +267,7 @@
}
public void removeDestination(final SimpleString address, final boolean durable) throws MessagingException
- {
+ {
checkClosed();
SessionRemoveDestinationMessage request = new SessionRemoveDestinationMessage(address, durable);
@@ -276,7 +276,7 @@
}
public ClientConsumer createConsumer(final SimpleString queueName) throws MessagingException
- {
+ {
checkClosed();
return createConsumer(queueName, null, false);
@@ -285,14 +285,15 @@
public ClientConsumer createConsumer(final SimpleString queueName,
final SimpleString filterString,
final boolean direct) throws MessagingException
- {
+ {
checkClosed();
return createConsumer(queueName,
filterString,
direct,
connectionFactory.getConsumerWindowSize(),
- connectionFactory.getConsumerMaxRate(), false);
+ connectionFactory.getConsumerMaxRate(),
+ false);
}
public ClientConsumer createConsumer(final SimpleString queueName,
@@ -300,11 +301,12 @@
final boolean direct,
final boolean browseOnly) throws MessagingException
{
- return createConsumer(queueName,
+ return createConsumer(queueName,
filterString,
direct,
connectionFactory.getConsumerWindowSize(),
- connectionFactory.getConsumerMaxRate(), browseOnly);
+ connectionFactory.getConsumerMaxRate(),
+ browseOnly);
}
public ClientConsumer createConsumer(final SimpleString queueName,
@@ -313,7 +315,7 @@
final int windowSize,
final int maxRate,
final boolean browseOnly) throws MessagingException
- {
+ {
checkClosed();
SessionCreateConsumerMessage request = new SessionCreateConsumerMessage(queueName,
@@ -372,7 +374,7 @@
}
public ClientProducer createProducer(final SimpleString address) throws MessagingException
- {
+ {
checkClosed();
return createProducer(address, connectionFactory.getProducerMaxRate());
@@ -380,7 +382,7 @@
public ClientProducer createProducer(final SimpleString address, final int maxRate) throws MessagingException
{
- return createProducer(address,
+ return createProducer(address,
maxRate,
connectionFactory.isBlockOnNonPersistentSend(),
connectionFactory.isBlockOnPersistentSend());
@@ -390,7 +392,7 @@
final int maxRate,
final boolean blockOnNonPersistentSend,
final boolean blockOnPersistentSend) throws MessagingException
- {
+ {
checkClosed();
ClientProducerInternal producer = null;
@@ -402,9 +404,7 @@
if (producer == null)
{
- SessionCreateProducerMessage request = new SessionCreateProducerMessage(address,
- maxRate,
- autoGroupId);
+ SessionCreateProducerMessage request = new SessionCreateProducerMessage(address, maxRate, autoGroupId);
SessionCreateProducerResponseMessage response = (SessionCreateProducerResponseMessage)channel.sendBlocking(request);
@@ -422,7 +422,7 @@
false),
autoCommitSends && blockOnNonPersistentSend,
autoCommitSends && blockOnPersistentSend,
- response.getAutoGroupId(),
+ response.getAutoGroupId(),
channel);
}
@@ -447,10 +447,10 @@
{
checkClosed();
- //We do a "JMS style" rollback where the session is stopped, and the buffer is cancelled back
- //first before rolling back
- //This ensures messages are received in the same order after rollback w.r.t. to messages in the buffer
- //For core we could just do a straight rollback, it really depends if we want JMS style semantics or not...
+ // We do a "JMS style" rollback where the session is stopped, and the buffer is cancelled back
+ // first before rolling back
+ // This ensures messages are received in the same order after rollback w.r.t. to messages in the buffer
+ // For core we could just do a straight rollback, it really depends if we want JMS style semantics or not...
boolean wasStarted = started;
@@ -529,7 +529,7 @@
}
public void start() throws MessagingException
- {
+ {
checkClosed();
if (!started)
@@ -541,7 +541,7 @@
}
public void stop() throws MessagingException
- {
+ {
checkClosed();
if (started)
@@ -577,7 +577,7 @@
// This acknowledges all messages received by the consumer so far
public void acknowledge(final long consumerID, final long messageID) throws MessagingException
- {
+ {
checkClosed();
SessionAcknowledgeMessage message = new SessionAcknowledgeMessage(consumerID, messageID, blockOnAcknowledge);
@@ -643,7 +643,7 @@
}
public void close() throws MessagingException
- {
+ {
if (closed)
{
return;
@@ -675,14 +675,14 @@
doCleanup();
}
- //Needs to be synchronized to prevent issues with occurring concurrently with close()
+ // Needs to be synchronized to prevent issues with occurring concurrently with close()
public synchronized void handleFailover()
{
if (closed)
{
- return ;
+ return;
}
-
+
// We lock the channel to prevent any packets to be added to the resend
// cache during the failover process
channel.lock();
@@ -698,9 +698,9 @@
Packet request = new ReattachSessionMessage(name, channel.getLastReceivedCommandID());
Channel channel1 = backupConnection.getChannel(1, -1, false);
-
- ReattachSessionResponseMessage response = (ReattachSessionResponseMessage)channel1.sendBlocking(request);
+ ReattachSessionResponseMessage response = (ReattachSessionResponseMessage)channel1.sendBlocking(request);
+
if (!response.isRemoved())
{
channel.replayCommands(response.getLastReceivedCommandID());
@@ -711,7 +711,7 @@
// closed on the server so we need to interrupt it
channel.returnBlocking();
}
-
+
backupConnection = null;
}
catch (Throwable t)
@@ -724,8 +724,8 @@
}
channel.send(new SessionFailoverCompleteMessage(name));
-
- //Now we can add a failure listener since if a further failure occurs we cleanup since no backup any more
+
+ // Now we can add a failure listener since if a further failure occurs we cleanup since no backup any more
remotingConnection.addFailureListener(this);
}
@@ -996,7 +996,7 @@
// FailureListener implementation --------------------------------------------
public void connectionFailed(final MessagingException me)
- {
+ {
try
{
cleanUp();
@@ -1004,7 +1004,7 @@
catch (Exception e)
{
log.error("Failed to cleanup session");
- }
+ }
}
// Public
@@ -1019,7 +1019,7 @@
{
return remotingConnection;
}
-
+
public RemotingConnection getBackupConnection()
{
return backupConnection;
@@ -1057,16 +1057,16 @@
{
producerCache.clear();
}
-
+
remotingConnection.removeFailureListener(this);
synchronized (this)
{
closed = true;
-
- channel.close();
- }
-
+
+ channel.close();
+ }
+
sessionFactory.removeSession(this);
}
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2008-11-03 17:15:30 UTC (rev 5247)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2008-11-03 17:33:11 UTC (rev 5248)
@@ -137,6 +137,14 @@
pagingManager.stop();
addressManager.clear();
+
+ //Release all the locks
+ for (SendLock lock: addressLocks.values())
+ {
+ lock.close();
+ }
+
+ addressLocks.clear();
started = false;
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-11-03 17:15:30 UTC (rev 5247)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-11-03 17:33:11 UTC (rev 5248)
@@ -210,17 +210,17 @@
private boolean idGeneratorSynced = false;
private final Object transferLock = new Object();
-
+
private final ChannelHandler ppHandler = new PingPongHandler();
-
+
private boolean frozen;
-
+
private final Object failLock = new Object();
-
+
// debug only stuff
private boolean createdActive;
-
+
// Constructors
// ---------------------------------------------------------------------------------
@@ -239,22 +239,15 @@
/*
* Create a server side connection
*/
- public RemotingConnectionImpl(final Connection transportConnection,
+ public RemotingConnectionImpl(final Connection transportConnection,
final List<Interceptor> interceptors,
final RemotingConnection replicatingConnection,
final boolean active)
{
- this(transportConnection,
- -1,
- -1,
- null,
- interceptors,
- replicatingConnection,
- active,
- false);
+ this(transportConnection, -1, -1, null, interceptors, replicatingConnection, active, false);
}
-
+
private RemotingConnectionImpl(final Connection transportConnection,
final long blockingCallTimeout,
final long pingPeriod,
@@ -313,9 +306,7 @@
return transportConnection.getID();
}
- public synchronized Channel getChannel(final long channelID,
- final int windowSize,
- final boolean block)
+ public synchronized Channel getChannel(final long channelID, final int windowSize, final boolean block)
{
ChannelImpl channel = channels.get(channelID);
@@ -368,16 +359,16 @@
}
log.warn(me.getMessage());
-
+
// Then call the listeners
callListeners(me);
-
+
internalClose();
for (Channel channel : channels.values())
{
channel.fail();
- }
+ }
}
}
@@ -436,13 +427,13 @@
public void bufferReceived(final Object connectionID, final MessagingBuffer buffer)
{
final Packet packet = decode(buffer);
-
+
synchronized (transferLock)
{
if (!frozen)
{
final ChannelImpl channel = channels.get(packet.getChannelID());
-
+
if (channel != null)
{
channel.handlePacket(packet);
@@ -455,17 +446,17 @@
{
active = true;
}
-
+
public void freeze()
{
- //Prevent any more packets being handled on this connection
-
+ // Prevent any more packets being handled on this connection
+
synchronized (transferLock)
{
frozen = true;
}
}
-
+
// Package protected
// ----------------------------------------------------------------------------
@@ -519,7 +510,7 @@
channel.close();
}
}
-
+
private Packet decode(final MessagingBuffer in)
{
final byte packetType = in.getByte();
@@ -844,13 +835,13 @@
private boolean failingOver;
private final Queue<DelayedResult> responseActions = new ConcurrentLinkedQueue<DelayedResult>();
-
+
private final int windowSize;
-
+
private final int confWindowSize;
-
+
private final Semaphore sendSemaphore;
-
+
private int receivedBytes;
private ChannelImpl(final RemotingConnectionImpl connection,
@@ -866,22 +857,22 @@
{
// Don't want to send confirmations if replicating to backup
this.windowSize = -1;
-
+
this.confWindowSize = -1;
-
- //We don't redirect the ping channel
-
+
+ // We don't redirect the ping channel
+
if (id != 0)
{
replicatingChannel = connection.replicatingConnection.getChannel(id, -1, false);
-
+
replicatingChannel.setHandler(new ReplicatedPacketsConfirmedChannelHandler());
}
}
else
{
this.windowSize = windowSize;
-
+
this.confWindowSize = (int)(0.75 * windowSize);
replicatingChannel = null;
@@ -890,7 +881,7 @@
if (this.windowSize != -1)
{
resendCache = new ConcurrentLinkedQueue<Packet>();
-
+
if (block)
{
sendSemaphore = new Semaphore(windowSize, true);
@@ -903,7 +894,7 @@
else
{
resendCache = null;
-
+
sendSemaphore = null;
}
}
@@ -917,7 +908,7 @@
{
return lastReceivedCommandID;
}
-
+
public Lock getLock()
{
return lock;
@@ -945,9 +936,26 @@
synchronized (sendLock)
{
packet.setChannelID(id);
-
+
+ final MessagingBuffer buffer = connection.transportConnection.createBuffer(PacketImpl.INITIAL_BUFFER_SIZE);
+
+ int size = packet.encode(buffer);
+
+ // Must block on semaphore outside the main lock or this can prevent failover from occurring
+ if (sendSemaphore != null)
+ {
+ try
+ {
+ sendSemaphore.acquire(size);
+ }
+ catch (InterruptedException e)
+ {
+ throw new IllegalStateException("Semaphore interrupted");
+ }
+ }
+
lock.lock();
-
+
try
{
while (failingOver)
@@ -961,8 +969,16 @@
{
}
}
-
- addToCacheAndWrite(packet, true);
+
+ if (resendCache != null && packet.isRequiresConfirmations())
+ {
+ resendCache.add(packet);
+ }
+
+ if (connection.active || packet.isWriteAlways())
+ {
+ connection.transportConnection.write(buffer);
+ }
}
finally
{
@@ -978,7 +994,7 @@
{
throw new MessagingException(MessagingException.NOT_CONNECTED, "Connection is destroyed");
}
-
+
if (connection.blockingCallTimeout == -1)
{
throw new IllegalStateException("Cannot do a blocking call timeout on a server side connection");
@@ -986,6 +1002,23 @@
packet.setChannelID(id);
+ final MessagingBuffer buffer = connection.transportConnection.createBuffer(PacketImpl.INITIAL_BUFFER_SIZE);
+
+ int size = packet.encode(buffer);
+
+ // Must block on semaphore outside the main lock or this can prevent failover from occurring
+ if (sendSemaphore != null)
+ {
+ try
+ {
+ sendSemaphore.acquire(size);
+ }
+ catch (InterruptedException e)
+ {
+ throw new IllegalStateException("Semaphore interrupted");
+ }
+ }
+
lock.lock();
try
@@ -1003,9 +1036,14 @@
}
response = null;
-
- addToCacheAndWrite(packet, false);
-
+
+ if (resendCache != null && packet.isRequiresConfirmations())
+ {
+ resendCache.add(packet);
+ }
+
+ connection.transportConnection.write(buffer);
+
long toWait = connection.blockingCallTimeout;
long start = System.currentTimeMillis();
@@ -1017,7 +1055,7 @@
sendCondition.await(toWait, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e)
- {
+ {
}
final long now = System.currentTimeMillis();
@@ -1125,20 +1163,20 @@
}
public void fail()
- {
+ {
}
public Channel getReplicatingChannel()
{
return replicatingChannel;
}
-
+
public void transferConnection(final RemotingConnection newConnection)
{
// Needs to synchronize on the connection to make sure no packets from
// the old connection get processed after transfer has occurred
synchronized (connection.transferLock)
- {
+ {
connection.channels.remove(id);
// And switch it
@@ -1185,7 +1223,7 @@
lock.unlock();
}
-
+
public RemotingConnection getConnection()
{
return connection;
@@ -1265,30 +1303,30 @@
}
}
}
-
+
private void doWrite(final Packet packet)
- {
+ {
final MessagingBuffer buffer = connection.transportConnection.createBuffer(PacketImpl.INITIAL_BUFFER_SIZE);
packet.encode(buffer);
-
+
connection.transportConnection.write(buffer);
}
-
+
private void checkConfirmation(final Packet packet)
{
if (resendCache != null && packet.isRequiresConfirmations())
{
lastReceivedCommandID++;
-
+
receivedBytes += packet.getPacketSize();
-
+
if (receivedBytes >= confWindowSize)
{
final Packet confirmed = new PacketsConfirmedMessage(lastReceivedCommandID);
confirmed.setChannelID(id);
-
+
receivedBytes = 0;
doWrite(confirmed);
@@ -1296,35 +1334,6 @@
}
}
- private void addToCacheAndWrite(final Packet packet, final boolean checkActive)
- {
- final MessagingBuffer buffer = connection.transportConnection.createBuffer(PacketImpl.INITIAL_BUFFER_SIZE);
-
- int size = packet.encode(buffer);
-
- if (resendCache != null && packet.isRequiresConfirmations())
- {
- resendCache.add(packet);
-
- if (sendSemaphore != null)
- {
- try
- {
- sendSemaphore.acquire(size);
- }
- catch (InterruptedException e)
- {
- throw new IllegalStateException("Semaphore interrupted");
- }
- }
- }
-
- if (!checkActive || connection.active || packet.isWriteAlways())
- {
- connection.transportConnection.write(buffer);
- }
- }
-
private void clearUpTo(final int lastReceivedCommandID)
{
final int numberToClear = 1 + lastReceivedCommandID - firstStoredCommandID;
@@ -1333,7 +1342,7 @@
{
throw new IllegalArgumentException("Invalid lastReceivedCommandID: " + lastReceivedCommandID);
}
-
+
int sizeToFree = 0;
for (int i = 0; i < numberToClear; i++)
@@ -1357,12 +1366,12 @@
" created active " +
connection.createdActive);
}
-
+
sizeToFree += packet.getPacketSize();
}
firstStoredCommandID += numberToClear;
-
+
if (sendSemaphore != null)
{
sendSemaphore.release(sizeToFree);
@@ -1370,21 +1379,21 @@
}
private class ReplicatedPacketsConfirmedChannelHandler implements ChannelHandler
- {
+ {
public void handlePacket(final Packet packet)
{
switch (packet.getType())
- {
+ {
case PACKETS_CONFIRMED:
{
doWrite(packet);
-
+
break;
}
case REPLICATION_RESPONSE:
{
replicateResponseReceived();
-
+
break;
}
default:
@@ -1404,8 +1413,9 @@
{
// Error - didn't get pong back
final MessagingException me = new MessagingException(MessagingException.NOT_CONNECTED,
- "Did not receive pong from server, active " +
- createdActive + " client " + client);
+ "Did not receive pong from server, active " + createdActive +
+ " client " +
+ client);
fail(me);
}
@@ -1430,7 +1440,7 @@
if (type == PONG)
{
gotPong = true;
-
+
if (stopPinging)
{
future.cancel(true);
@@ -1442,7 +1452,7 @@
// Parameter is placeholder for future
final Packet pong = new Pong(-1);
-
+
pingChannel.send(pong);
}
else
Modified: trunk/src/main/org/jboss/messaging/core/server/SendLock.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/SendLock.java 2008-11-03 17:15:30 UTC (rev 5247)
+++ trunk/src/main/org/jboss/messaging/core/server/SendLock.java 2008-11-03 17:33:11 UTC (rev 5248)
@@ -41,4 +41,6 @@
void beforeSend();
void afterSend();
+
+ void close();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/SendLockImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/SendLockImpl.java 2008-11-03 17:15:30 UTC (rev 5247)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/SendLockImpl.java 2008-11-03 17:33:11 UTC (rev 5248)
@@ -40,14 +40,26 @@
private boolean locked;
private int count;
+
+ private boolean closed;
public synchronized void lock()
{
+ if (closed)
+ {
+ return;
+ }
+
while (count > 0 || locked)
{
try
{
wait();
+
+ if (closed)
+ {
+ return;
+ }
}
catch (InterruptedException e)
{
@@ -59,6 +71,11 @@
public synchronized void unlock()
{
+ if (closed)
+ {
+ return;
+ }
+
locked = false;
notifyAll();
@@ -66,11 +83,21 @@
public synchronized void beforeSend()
{
+ if (closed)
+ {
+ return;
+ }
+
while (locked)
{
try
{
wait();
+
+ if (closed)
+ {
+ return;
+ }
}
catch (InterruptedException e)
{
@@ -82,6 +109,11 @@
public synchronized void afterSend()
{
+ if (closed)
+ {
+ return;
+ }
+
count--;
if (count < 0)
@@ -94,5 +126,12 @@
notifyAll();
}
}
+
+ public synchronized void close()
+ {
+ closed = true;
+
+ notifyAll();
+ }
}
Modified: trunk/tests/bin/runtest
===================================================================
--- trunk/tests/bin/runtest 2008-11-03 17:15:30 UTC (rev 5247)
+++ trunk/tests/bin/runtest 2008-11-03 17:33:11 UTC (rev 5248)
@@ -133,7 +133,7 @@
fi
JAVA_OPTS="-Xmx1024M $JAVA_OPTS \
--Dmodule.output=$reldir/../../ \
+-Dcom.sun.management.jmxremote -Dmodule.output=$reldir/../../ \
$REMOTE_TEST \
-Dtest.database=$TEST_DATABASE \
-Dtest.serialization=$TEST_SERIALIZATION \
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/MultiThreadRandomFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/MultiThreadRandomFailoverTest.java 2008-11-03 17:15:30 UTC (rev 5247)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/MultiThreadRandomFailoverTest.java 2008-11-03 17:33:11 UTC (rev 5248)
@@ -1194,7 +1194,7 @@
protected int getNumIterations()
{
- return 20;
+ return 50;
}
protected void setUp() throws Exception
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java 2008-11-03 17:15:30 UTC (rev 5247)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/paging/impl/PageManagerImplTest.java 2008-11-03 17:33:11 UTC (rev 5248)
@@ -67,136 +67,8 @@
// Public --------------------------------------------------------
- public void testGetStore() throws Exception
- {
- HierarchicalRepository<QueueSettings> queueSettings = new HierarchicalObjectRepository<QueueSettings>();
- queueSettings.setDefault(new QueueSettings());
+
- PagingStoreFactory spi = EasyMock.createMock(PagingStoreFactory.class);
- PagingManagerImpl manager = new PagingManagerImpl(spi, null, queueSettings, -1);
-
- SimpleString destination = new SimpleString("some-destination");
-
- try
- {
- manager.getPageStore(destination);
- fail("supposed to throw an exception");
- }
- catch (Exception ignored)
- {
- }
-
- manager.start();
-
- PagingStore store = EasyMock.createNiceMock(PagingStore.class);
-
- EasyMock.expect(spi.newStore(EasyMock.eq(destination), EasyMock.isA(QueueSettings.class))).andReturn(store);
-
- store.start();
-
- EasyMock.replay(spi, store);
-
- assertEquals(store, manager.getPageStore(destination));
-
- EasyMock.verify(spi, store);
-
- EasyMock.reset(spi, store);
-
- EasyMock.replay(spi, store);
-
- // it should use the cached store, so nothing else should be called on any
- // SPI
- assertEquals(store, manager.getPageStore(destination));
-
- EasyMock.verify(spi, store);
-
- EasyMock.reset(spi, store);
-
- store.stop();
-
- EasyMock.replay(spi, store);
-
- manager.stop();
-
- EasyMock.verify(spi, store);
-
- }
-
- public void testMultipleThreadsGetStore() throws Exception
- {
- PagingStoreFactory spi = EasyMock.createMock(PagingStoreFactory.class);
- final PagingManagerImpl manager = new PagingManagerImpl(spi, null, repoSettings, -1);
-
- final SimpleString destination = new SimpleString("some-destination");
-
- final SequentialFileFactory factory = EasyMock.createNiceMock(SequentialFileFactory.class);
-
- EasyMock.expect(factory.listFiles(EasyMock.isA(String.class))).andStubReturn(new ArrayList<String>());
-
- PagingStoreImpl storeImpl = new PagingStoreImpl(manager,
- factory,
- destination,
- new QueueSettings(),
- Executors.newSingleThreadExecutor());
-
- EasyMock.expect(spi.newStore(EasyMock.eq(destination), EasyMock.isA(QueueSettings.class)))
- .andStubReturn(storeImpl);
-
- EasyMock.replay(spi, factory);
-
- manager.start();
-
- int NUMBER_OF_THREADS = 100;
-
- final CountDownLatch latchPositioned = new CountDownLatch(NUMBER_OF_THREADS);
- final CountDownLatch latchReady = new CountDownLatch(1);
-
- class GetPageThread extends Thread
- {
- Exception e;
-
- @Override
- public void run()
- {
- try
- {
- latchPositioned.countDown();
- latchReady.await();
- manager.getPageStore(destination);
-
- }
- catch (Exception e)
- {
- e.printStackTrace();
- this.e = e;
- }
-
- }
- }
-
- GetPageThread threads[] = new GetPageThread[NUMBER_OF_THREADS];
- for (int i = 0; i < NUMBER_OF_THREADS; i++)
- {
- threads[i] = new GetPageThread();
- threads[i].start();
- }
-
- latchPositioned.await();
- latchReady.countDown();
-
- for (GetPageThread thread : threads)
- {
- thread.join();
- if (thread.e != null)
- {
- throw thread.e;
- }
- }
-
- EasyMock.verify(spi, factory);
-
- }
-
public void testOnDepage() throws Exception
{
long time = System.currentTimeMillis() + 10000;
More information about the jboss-cvs-commits
mailing list