[jboss-cvs] JBoss Messaging SVN: r5690 - in trunk: src/main/org/jboss/messaging/core/paging/impl and 5 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Jan 22 15:03:37 EST 2009
Author: clebert.suconic at jboss.com
Date: 2009-01-22 15:03:36 -0500 (Thu, 22 Jan 2009)
New Revision: 5690
Added:
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java
Modified:
trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReplicateDeliveryMessage.java
trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
Log:
Tweaks on paging & failover
Modified: trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java 2009-01-22 14:37:22 UTC (rev 5689)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagingManager.java 2009-01-22 20:03:36 UTC (rev 5690)
@@ -73,7 +73,7 @@
* @return
* @throws Exception
*/
- PagingStore createPageStore(SimpleString destination, boolean createDir) throws Exception;
+ PagingStore createPageStore(SimpleString destination) throws Exception;
/** To return the PageStore associated with the address */
PagingStore getPageStore(SimpleString address) throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2009-01-22 14:37:22 UTC (rev 5689)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2009-01-22 20:03:36 UTC (rev 5690)
@@ -136,14 +136,14 @@
/* (non-Javadoc)
* @see org.jboss.messaging.core.paging.PagingManager#reloadStores()
*/
- public void reloadStores() throws Exception
+ public synchronized void reloadStores() throws Exception
{
List<PagingStore> destinations = pagingStoreFactory.reloadStores(queueSettingsRepository);
for (PagingStore store: destinations)
{
- stores.put(store.getStoreName(), store);
store.start();
+ stores.put(store.getStoreName(), store);
}
}
@@ -152,7 +152,7 @@
* @param destination
* @return
*/
- public synchronized PagingStore createPageStore(final SimpleString storeName, final boolean createDir) throws Exception
+ public synchronized PagingStore createPageStore(final SimpleString storeName) throws Exception
{
PagingStore store = stores.get(storeName);
@@ -160,16 +160,9 @@
{
store = newStore(storeName);
- PagingStore oldStore = stores.putIfAbsent(storeName, store);
+ store.start();
- if (oldStore != null)
- {
- store = oldStore;
- }
- else
- {
- store.start();
- }
+ stores.put(storeName, store);
}
return store;
@@ -181,7 +174,7 @@
if (store == null)
{
- store = createPageStore(storeName, true);
+ store = createPageStore(storeName);
}
return store;
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2009-01-22 14:37:22 UTC (rev 5689)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2009-01-22 20:03:36 UTC (rev 5690)
@@ -303,7 +303,7 @@
(maxGlobalSize - pagingManager.getDefaultPageSize()));
}
- if (pagingManager.isGlobalPageMode() && currentGlobalSize < maxGlobalSize - pagingManager.getDefaultPageSize())
+ if (maxGlobalSize > 0 && pagingManager.isGlobalPageMode() && currentGlobalSize < maxGlobalSize - pagingManager.getDefaultPageSize())
{
pagingManager.startGlobalDepage();
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReplicateDeliveryMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReplicateDeliveryMessage.java 2009-01-22 14:37:22 UTC (rev 5689)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReplicateDeliveryMessage.java 2009-01-22 20:03:36 UTC (rev 5690)
@@ -13,8 +13,6 @@
package org.jboss.messaging.core.remoting.impl.wireformat;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import org.jboss.messaging.util.DataConstants;
-import org.jboss.messaging.util.SimpleString;
/**
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -30,21 +28,17 @@
private long messageID;
- private SimpleString address;
-
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public SessionReplicateDeliveryMessage(final long consumerID, final long messageID, final SimpleString address)
+ public SessionReplicateDeliveryMessage(final long consumerID, final long messageID)
{
super(SESS_REPLICATE_DELIVERY);
this.consumerID = consumerID;
this.messageID = messageID;
-
- this.address = address;
}
public SessionReplicateDeliveryMessage()
@@ -64,18 +58,11 @@
return messageID;
}
- public SimpleString getAddress()
- {
- return address;
- }
-
public void encodeBody(final MessagingBuffer buffer)
{
buffer.putLong(consumerID);
buffer.putLong(messageID);
-
- buffer.putSimpleString(address);
}
public void decodeBody(final MessagingBuffer buffer)
@@ -83,20 +70,13 @@
consumerID = buffer.getLong();
messageID = buffer.getLong();
-
- address = buffer.getSimpleString();
}
-
+
public boolean isRequiresConfirmations()
- {
+ {
return false;
}
- public int getRequiredBufferSize()
- {
- return BASIC_PACKET_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_LONG + SimpleString.sizeofString(address);
- }
-
public boolean equals(Object other)
{
if (other instanceof SessionReplicateDeliveryMessage == false)
@@ -108,7 +88,7 @@
return super.equals(other) && this.consumerID == r.consumerID && this.messageID == r.messageID;
}
-
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java 2009-01-22 14:37:22 UTC (rev 5689)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerConsumer.java 2009-01-22 20:03:36 UTC (rev 5690)
@@ -57,7 +57,7 @@
void failedOver();
- void deliverReplicated(SimpleString address, long messageID) throws Exception;
+ void deliverReplicated(long messageID) throws Exception;
void lock();
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-01-22 14:37:22 UTC (rev 5689)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-01-22 20:03:36 UTC (rev 5690)
@@ -88,6 +88,8 @@
private final long id;
private final Queue messageQueue;
+
+ private final SimpleString bindingAddress;
private final Filter filter;
@@ -115,7 +117,7 @@
private final boolean browseOnly;
private final StorageManager storageManager;
-
+
private final PagingManager pagingManager;
private final java.util.Queue<MessageReference> deliveringRefs = new ConcurrentLinkedQueue<MessageReference>();
@@ -130,6 +132,7 @@
public ServerConsumerImpl(final long id,
final ServerSession session,
+ final SimpleString bindingAddress,
final Queue messageQueue,
final Filter filter,
final boolean started,
@@ -143,6 +146,8 @@
this.id = id;
this.messageQueue = messageQueue;
+
+ this.bindingAddress = bindingAddress;
this.filter = filter;
@@ -159,7 +164,7 @@
this.channel = channel;
this.preAcknowledge = preAcknowledge;
-
+
this.pagingManager = pagingManager;
messageQueue.addConsumer(this);
@@ -361,7 +366,7 @@
else
{
ref.getQueue().acknowledge(tx, ref);
-
+
// Del count is not actually updated in storage unless it's
// cancelled
ref.incrementDeliveryCount();
@@ -409,11 +414,11 @@
return ref;
}
- public void deliverReplicated(final SimpleString address, final long messageID) throws Exception
+ public void deliverReplicated(final long messageID) throws Exception
{
// It may not be the first in the queue - since there may be multiple producers
// sending to the queue
- MessageReference ref = removeReferenceOnBackup(address, messageID);
+ MessageReference ref = removeReferenceOnBackup(messageID);
if (ref == null)
{
@@ -459,7 +464,7 @@
// Private --------------------------------------------------------------------------------------
- private MessageReference removeReferenceOnBackup(SimpleString address, long id) throws Exception
+ private MessageReference removeReferenceOnBackup(long id) throws Exception
{
// most of the times, the remove will work ok, so we first try it without any locks
@@ -467,9 +472,9 @@
if (ref == null)
{
- PagingStore store = pagingManager.getPageStore(address);
+ PagingStore store = pagingManager.getPageStore(bindingAddress);
- for (;;)
+ while (true)
{
// Can't have the same store being depaged in more than one thread
synchronized (store)
@@ -478,8 +483,9 @@
ref = messageQueue.removeReferenceWithID(id);
if (ref == null)
{
+ System.out.println("Forcing depage");
// force a depage
- if (!store.readPage())
+ if (!store.readPage()) // This returns false if there are no pages
{
break;
}
@@ -558,8 +564,8 @@
return HandleStatus.BUSY;
}
- //TODO use a null or boolean check here for performance
-
+ // note: Since we schedule deliveries to start under replication, we use a counter of pendingLargeMessages.
+
// If there is a pendingLargeMessage we can't take another message
// This has to be checked inside the lock as the set to null is done inside the lock
if (pendingLargeMessagesCounter.get() > 0)
@@ -620,9 +626,7 @@
final LargeMessageDeliverer localDeliverer = new LargeMessageDeliverer((LargeServerMessage)message, ref);
- DelayedResult result = channel.replicatePacket(new SessionReplicateDeliveryMessage(id,
- message.getMessageID(),
- message.getDestination()));
+ DelayedResult result = channel.replicatePacket(new SessionReplicateDeliveryMessage(id, message.getMessageID()));
if (result == null)
{
@@ -670,9 +674,7 @@
final SessionReceiveMessage packet = new SessionReceiveMessage(id, message, ref.getDeliveryCount() + 1);
- DelayedResult result = channel.replicatePacket(new SessionReplicateDeliveryMessage(id,
- message.getMessageID(),
- message.getDestination()));
+ DelayedResult result = channel.replicatePacket(new SessionReplicateDeliveryMessage(id, message.getMessageID()));
if (result == null)
{
@@ -757,16 +759,16 @@
return false;
}
- int creditsUsed;
+ int precalculateAvailableCredits;
if (availableCredits != null)
{
// Flow control needs to be done in advance.
- creditsUsed = preCalculateFlowControl();
+ precalculateAvailableCredits = preCalculateFlowControl();
}
else
{
- creditsUsed = 0;
+ precalculateAvailableCredits = 0;
}
if (!sentFirstMessage)
@@ -789,7 +791,7 @@
if (availableCredits != null)
{
- if ((creditsUsed -= initialMessage.getRequiredBufferSize()) < 0)
+ if ((precalculateAvailableCredits -= initialMessage.getRequiredBufferSize()) < 0)
{
log.warn("Credit logic is not working properly, too many credits were taken");
}
@@ -798,7 +800,7 @@
{
trace("deliverLargeMessage:: Initial send, taking out " + initialMessage.getRequiredBufferSize() +
" credits, current = " +
- creditsUsed +
+ precalculateAvailableCredits +
" isBackup = " +
messageQueue.isBackup());
@@ -815,7 +817,7 @@
while (positionPendingLargeMessage < sizePendingLargeMessage)
{
- if (creditsUsed <= 0)
+ if (precalculateAvailableCredits <= 0)
{
if (trace)
{
@@ -830,7 +832,7 @@
if (availableCredits != null)
{
- if ((creditsUsed -= chunk.getRequiredBufferSize()) < 0)
+ if ((precalculateAvailableCredits -= chunk.getRequiredBufferSize()) < 0)
{
log.warn("Flowcontrol logic is not working properly, too many credits were taken");
}
@@ -850,9 +852,9 @@
positionPendingLargeMessage += chunkLen;
}
- if (creditsUsed != 0)
+ if (precalculateAvailableCredits != 0)
{
- log.warn("Flowcontrol logic is not working properly... creidts = " + creditsUsed);
+ log.warn("Flowcontrol logic is not working properly... creidts = " + precalculateAvailableCredits);
}
if (trace)
@@ -880,29 +882,28 @@
*/
private int preCalculateFlowControl()
{
- for (;;)
+ while (true)
{
- final int currentCredit;
- int creditsUsed = 0;
- currentCredit = availableCredits.get();
+ final int currentCredit = availableCredits.get();
+ int precalculatedCredits = 0;
if (!sentFirstMessage)
{
- creditsUsed = SessionReceiveMessage.SESSION_RECEIVE_MESSAGE_LARGE_MESSAGE_SIZE + pendingLargeMessage.getPropertiesEncodeSize();
+ precalculatedCredits = SessionReceiveMessage.SESSION_RECEIVE_MESSAGE_LARGE_MESSAGE_SIZE + pendingLargeMessage.getPropertiesEncodeSize();
}
long chunkLen = 0;
- for (long i = positionPendingLargeMessage; creditsUsed < currentCredit && i < sizePendingLargeMessage; i += chunkLen)
+ for (long i = positionPendingLargeMessage; precalculatedCredits < currentCredit && i < sizePendingLargeMessage; i += chunkLen)
{
chunkLen = (int)Math.min(sizePendingLargeMessage - i, minLargeMessageSize);
- creditsUsed += chunkLen + SessionReceiveContinuationMessage.SESSION_RECEIVE_CONTINUATION_BASE_SIZE;
+ precalculatedCredits += chunkLen + SessionReceiveContinuationMessage.SESSION_RECEIVE_CONTINUATION_BASE_SIZE;
}
// The calculation of credits and taking credits out has to be taken atomically.
// Since we are not sending anything to the client during this calculation, this is unlikely to happen
- if (availableCredits.compareAndSet(currentCredit, currentCredit - creditsUsed))
+ if (availableCredits.compareAndSet(currentCredit, currentCredit - precalculatedCredits))
{
- return creditsUsed;
+ return precalculatedCredits;
}
}
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-01-22 14:37:22 UTC (rev 5689)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-01-22 20:03:36 UTC (rev 5690)
@@ -1029,16 +1029,19 @@
{
DelayedResult result = channel.replicatePacket(packet);
+ try
+ {
+ // Note we don't wait for response before handling this
+
+ consumers.get(packet.getConsumerID()).receiveCredits(packet.getCredits());
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to receive credits", e);
+ }
+
if (result == null)
{
- try
- {
- consumers.get(packet.getConsumerID()).receiveCredits(packet.getCredits());
- }
- catch (Exception e)
- {
- log.error("Failed to receive credits", e);
- }
channel.confirm(packet);
}
else
@@ -1047,14 +1050,6 @@
{
public void run()
{
- try
- {
- consumers.get(packet.getConsumerID()).receiveCredits(packet.getCredits());
- }
- catch (Exception e)
- {
- log.error("Failed to receive credits", e);
- }
channel.confirm(packet);
}
});
@@ -1249,7 +1244,7 @@
try
{
- consumer.deliverReplicated(packet.getAddress(), packet.getMessageID());
+ consumer.deliverReplicated(packet.getMessageID());
}
catch (Exception e)
{
@@ -1403,6 +1398,7 @@
ServerConsumer consumer = new ServerConsumerImpl(idGenerator.generateID(),
this,
+ binding.getAddress(),
theQueue,
filter,
started,
Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java 2009-01-22 20:03:36 UTC (rev 5690)
@@ -0,0 +1,560 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.failover;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.paging.PagingManager;
+import org.jboss.messaging.core.paging.PagingStore;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
+import org.jboss.messaging.core.remoting.impl.invm.InVMConnector;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A PagingFailoverTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ * Created Dec 8, 2008 10:53:16 AM
+ *
+ *
+ */
+public class PagingFailoverTest extends FailoverTestBase
+{
+
+ // Constants -----------------------------------------------------
+ private static final Logger log = Logger.getLogger(PagingFailoverTest.class);
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ private static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testFailoverOnPaging() throws Exception
+ {
+ testPaging(true);
+ }
+
+ public void testReplicationOnPaging() throws Exception
+ {
+ testPaging(false);
+ }
+
+ private void testPaging(final boolean fail) throws Exception
+ {
+ setUpFileBased(100 * 1024);
+
+ ClientSession session = null;
+ try
+ {
+ ClientSessionFactory sf1 = createFailoverFactory();
+
+ session = sf1.createSession(null, null, false, true, true, false, 0);
+
+ session.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ final int numMessages = 50000;
+
+ PagingManager pmLive = liveService.getServer().getPostOffice().getPagingManager();
+ PagingStore storeLive = pmLive.getPageStore(ADDRESS);
+
+ PagingManager pmBackup = backupService.getServer().getPostOffice().getPagingManager();
+ PagingStore storeBackup = pmBackup.getPageStore(ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(true);
+ ByteBuffer buffer = ByteBuffer.allocate(1000);
+
+ buffer.putInt(i);
+
+ buffer.rewind();
+
+ message.setBody(new ByteBufferWrapper(buffer));
+
+ producer.send(message);
+
+ if (storeLive.isPaging())
+ {
+ assertTrue(storeBackup.isPaging());
+ }
+ }
+
+ session.close();
+ session = sf1.createSession(null, null, false, true, true, false, 0);
+ session.start();
+
+ final RemotingConnection conn = ((ClientSessionImpl)session).getConnection();
+
+ assertEquals("GloblSize", pmLive.getGlobalSize(), pmBackup.getGlobalSize());
+
+ assertEquals("PageSizeLive", storeLive.getAddressSize(), pmLive.getGlobalSize());
+
+ assertEquals("PageSizeBackup", storeBackup.getAddressSize(), pmBackup.getGlobalSize());
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ for (int i = 0; i < numMessages; i++)
+ {
+
+ if (fail && i == numMessages / 2)
+ {
+ conn.fail(new MessagingException(MessagingException.NOT_CONNECTED));
+ }
+
+ ClientMessage message = consumer.receive(10000);
+
+
+ assertNotNull(message);
+
+ message.acknowledge();
+
+ message.getBody().rewind();
+
+ assertEquals(i, message.getBody().getInt());
+
+ }
+
+ session.close();
+ session = null;
+
+ if (!fail)
+ {
+ assertEquals(0, pmLive.getGlobalSize());
+ assertEquals(0, storeLive.getAddressSize());
+ }
+ assertEquals(0, pmBackup.getGlobalSize());
+ assertEquals(0, storeBackup.getAddressSize());
+
+ }
+ finally
+ {
+ if (session != null)
+ {
+ try
+ {
+ session.close();
+ }
+ catch (Exception ignored)
+ {
+ // eat it
+ }
+ }
+ }
+
+ }
+
+ public void testMultithreadFailoverReplicationOnly() throws Throwable
+ {
+ setUpFileBased(100 * 1024, 20 * 1024);
+
+ int numberOfProducedMessages = multiThreadProducer(false);
+
+ System.out.println(numberOfProducedMessages + " messages produced");
+
+ int numberOfConsumedMessages = multiThreadConsumer(false, false);
+
+ assertEquals(numberOfProducedMessages, numberOfConsumedMessages);
+
+ }
+
+ public void testMultithreadFailoverOnProducing() throws Throwable
+ {
+ setUpFileBased(100 * 1024, 20 * 1024);
+
+ int numberOfProducedMessages = multiThreadProducer(true);
+
+ System.out.println(numberOfProducedMessages + " messages produced");
+
+ int numberOfConsumedMessages = multiThreadConsumer(true, false);
+
+ assertEquals(numberOfProducedMessages, numberOfConsumedMessages);
+
+ }
+
+ public void testMultithreadFailoverOnConsume() throws Throwable
+ {
+ setUpFileBased(100 * 1024, 20 * 1024);
+
+ int numberOfProducedMessages = multiThreadProducer(false);
+
+ System.out.println(numberOfProducedMessages + " messages produced");
+
+ int numberOfConsumedMessages = multiThreadConsumer(false, true);
+
+ assertEquals(numberOfProducedMessages, numberOfConsumedMessages);
+
+ }
+
+ /**
+ * @throws Exception
+ * @throws InterruptedException
+ * @throws Throwable
+ */
+ private int multiThreadConsumer(final boolean connectedOnBackup, final boolean fail) throws Exception,
+ InterruptedException,
+ Throwable
+ {
+ ClientSession session = null;
+ try
+ {
+ final AtomicInteger numberOfMessages = new AtomicInteger(0);
+
+ final int RECEIVE_TIMEOUT = 10000;
+
+ final ClientSessionFactory factory;
+ final PagingStore store;
+
+ if (connectedOnBackup)
+ {
+ factory = createBackupFactory();
+ store = backupService.getServer().getPostOffice().getPagingManager().getPageStore(ADDRESS);
+ }
+ else
+ {
+ factory = createFailoverFactory();
+ store = liveService.getServer().getPostOffice().getPagingManager().getPageStore(ADDRESS);
+ }
+
+ session = factory.createSession(false, true, true, false);
+
+ final int initialNumberOfPages = store.getNumberOfPages();
+
+ System.out.println("It has initially " + initialNumberOfPages);
+
+ final int THREAD_CONSUMERS = 20;
+
+ final CountDownLatch startFlag = new CountDownLatch(1);
+ final CountDownLatch alignSemaphore = new CountDownLatch(THREAD_CONSUMERS);
+
+ class Consumer extends Thread
+ {
+ volatile Throwable e;
+
+ ClientSession session;
+
+ public Consumer() throws Exception
+ {
+ session = factory.createSession(null, null, false, true, true, false, 0);
+ }
+
+ @Override
+ public void run()
+ {
+ boolean started = false;
+
+ try
+ {
+
+ try
+ {
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ session.start();
+
+ alignSemaphore.countDown();
+
+ started = true;
+
+ startFlag.await();
+
+ while (true)
+ {
+ ClientMessage msg = consumer.receive(RECEIVE_TIMEOUT);
+ if (msg == null)
+ {
+ break;
+ }
+
+ if (numberOfMessages.incrementAndGet() % 1000 == 0)
+ {
+ System.out.println(numberOfMessages + " messages read");
+ }
+
+ msg.acknowledge();
+ }
+
+ }
+ finally
+ {
+ session.close();
+ }
+ }
+ catch (Throwable e)
+ {
+ log.error(e.getMessage(), e);
+ if (!started)
+ {
+ alignSemaphore.countDown();
+ }
+ this.e = e;
+ }
+ }
+ }
+
+ Consumer[] consumers = new Consumer[THREAD_CONSUMERS];
+
+ for (int i = 0; i < THREAD_CONSUMERS; i++)
+ {
+ consumers[i] = new Consumer();
+ }
+
+ for (int i = 0; i < THREAD_CONSUMERS; i++)
+ {
+ consumers[i].start();
+ }
+
+ alignSemaphore.await();
+
+ startFlag.countDown();
+
+ if (fail)
+ {
+ Thread.sleep(1000);
+ while (store.getNumberOfPages() == initialNumberOfPages)
+ {
+ Thread.sleep(100);
+ }
+
+ System.out.println("The system has already depaged " + (initialNumberOfPages - store.getNumberOfPages()) +
+ ", failing now");
+
+ fail(session);
+ }
+
+ for (Thread t : consumers)
+ {
+ t.join();
+ }
+
+ for (Consumer p : consumers)
+ {
+ if (p.e != null)
+ {
+ throw p.e;
+ }
+ }
+
+ return numberOfMessages.intValue();
+ }
+ finally
+ {
+ if (session != null)
+ {
+ try
+ {
+ session.close();
+ }
+ catch (Exception ignored)
+ {
+ }
+ }
+ }
+ }
+
+ /**
+ * @throws Exception
+ * @throws MessagingException
+ * @throws InterruptedException
+ * @throws Throwable
+ */
+ private int multiThreadProducer(final boolean failover) throws Exception,
+ MessagingException,
+ InterruptedException,
+ Throwable
+ {
+
+ final AtomicInteger numberOfMessages = new AtomicInteger(0);
+ final PagingStore store = liveService.getServer().getPostOffice().getPagingManager().getPageStore(ADDRESS);
+
+ final ClientSessionFactory factory = createFailoverFactory();
+
+ ClientSession session = factory.createSession(false, true, true, false);
+ try
+ {
+ session.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+ final int THREAD_PRODUCERS = 30;
+
+ final CountDownLatch startFlag = new CountDownLatch(1);
+ final CountDownLatch alignSemaphore = new CountDownLatch(THREAD_PRODUCERS);
+ final CountDownLatch flagPaging = new CountDownLatch(THREAD_PRODUCERS);
+
+ class Producer extends Thread
+ {
+ volatile Throwable e;
+
+ @Override
+ public void run()
+ {
+ boolean started = false;
+ try
+ {
+ ClientSession session = factory.createSession(false, true, true);
+ try
+ {
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ alignSemaphore.countDown();
+
+ started = true;
+ startFlag.await();
+
+ while (!store.isPaging())
+ {
+
+ ClientMessage msg = session.createClientMessage(true);
+
+ producer.send(msg);
+ numberOfMessages.incrementAndGet();
+ }
+
+ flagPaging.countDown();
+
+ for (int i = 0; i < 1000; i++)
+ {
+
+ ClientMessage msg = session.createClientMessage(true);
+
+ producer.send(msg);
+ numberOfMessages.incrementAndGet();
+
+ }
+
+ }
+ finally
+ {
+ session.close();
+ }
+ }
+ catch (Throwable e)
+ {
+ log.error(e.getMessage(), e);
+ if (!started)
+ {
+ alignSemaphore.countDown();
+ }
+ flagPaging.countDown();
+ this.e = e;
+ }
+ }
+ }
+
+ Producer[] producers = new Producer[THREAD_PRODUCERS];
+
+ for (int i = 0; i < THREAD_PRODUCERS; i++)
+ {
+ producers[i] = new Producer();
+ producers[i].start();
+ }
+
+ alignSemaphore.await();
+
+ // Start producing only when all the sessions are opened
+ startFlag.countDown();
+
+ if (failover)
+ {
+ flagPaging.await(); // for this test I want everybody on the paging part
+
+ Thread.sleep(1500);
+
+ fail(session);
+
+ }
+
+ for (Thread t : producers)
+ {
+ t.join();
+ }
+
+ for (Producer p : producers)
+ {
+ if (p.e != null)
+ {
+ throw p.e;
+ }
+ }
+
+ return numberOfMessages.intValue();
+
+ }
+ finally
+ {
+ session.close();
+ InVMConnector.resetFailures();
+ }
+
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ protected void fail(final ClientSession session) throws Exception
+ {
+ RemotingConnectionImpl conn = (RemotingConnectionImpl)((ClientSessionImpl)session).getConnection();
+
+ InVMConnector.numberOfFailures = 1;
+ InVMConnector.failOnCreateConnection = true;
+ System.out.println("Forcing a failure");
+ conn.fail(new MessagingException(MessagingException.NOT_CONNECTED, "blah"));
+
+ }
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java 2009-01-22 14:37:22 UTC (rev 5689)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java 2009-01-22 20:03:36 UTC (rev 5690)
@@ -77,7 +77,7 @@
managerImpl.start();
- TestSupportPageStore store = (TestSupportPageStore)managerImpl.createPageStore(new SimpleString("simple-test"), true);
+ TestSupportPageStore store = (TestSupportPageStore)managerImpl.createPageStore(new SimpleString("simple-test"));
ServerMessage msg = createMessage(1l, new SimpleString("simple-test"), createRandomBuffer(10));
@@ -127,7 +127,7 @@
false);
managerImpl.start();
- managerImpl.createPageStore(new SimpleString("simple-test"), true);
+ managerImpl.createPageStore(new SimpleString("simple-test"));
ServerMessage msg = createMessage(1l, new SimpleString("simple-test"), createRandomBuffer(100));
More information about the jboss-cvs-commits
mailing list