[jboss-cvs] JBoss Messaging SVN: r5572 - in trunk: tests/src/org/jboss/messaging/tests/integration/paging and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Jan 2 19:00:24 EST 2009
Author: clebert.suconic at jboss.com
Date: 2009-01-02 19:00:23 -0500 (Fri, 02 Jan 2009)
New Revision: 5572
Modified:
trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingServiceIntegrationTest.java
Log:
JBMESSAGING-1473 & JBMESSAGING-1474 - Fixing tests
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2009-01-02 22:50:41 UTC (rev 5571)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2009-01-03 00:00:23 UTC (rev 5572)
@@ -723,9 +723,16 @@
trace("Depaging....");
}
+ if (pagedMessages.size() == 0)
+ {
+ // nothing to be done on this case.
+ return;
+ }
+
+
// Depage has to be done atomically, in case of failure it should be
// back to where it was
-
+
Transaction depageTransaction = new TransactionImpl(storageManager, postOffice, true);
HashSet<PageTransactionInfo> pageTransactionsToUpdate = new HashSet<PageTransactionInfo>();
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingServiceIntegrationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingServiceIntegrationTest.java 2009-01-02 22:50:41 UTC (rev 5571)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingServiceIntegrationTest.java 2009-01-03 00:00:23 UTC (rev 5572)
@@ -38,6 +38,7 @@
import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.tests.util.ServiceTestBase;
import org.jboss.messaging.util.DataConstants;
@@ -73,7 +74,6 @@
{
super.tearDown();
}
-
public void testSendReceivePaging() throws Exception
{
clearData();
@@ -142,8 +142,6 @@
sf = createInVMFactory();
- System.out.println("Size = " + messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize());
-
assertTrue(messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize() > 0);
session = sf.createSession(null, null, false, true, true, false, 0);
@@ -196,7 +194,6 @@
}
-
/**
* - Make a destination in page mode
* - Add stuff to a transaction
@@ -244,34 +241,31 @@
{
message = session.createClientMessage(true);
message.setBody(bodyLocal);
-
+
// Stop sending message as soon as we start paging
if (messagingService.getServer().getPostOffice().getPagingManager().isPaging(ADDRESS))
{
break;
}
- numberOfMessages ++;
-
+ numberOfMessages++;
producer.send(message);
}
-
-
assertTrue(messagingService.getServer().getPostOffice().getPagingManager().isPaging(ADDRESS));
-
+
session.start();
-
+
ClientSession sessionTransacted = sf.createSession(null, null, false, false, false, false, 0);
ClientProducer producerTransacted = sessionTransacted.createProducer(ADDRESS);
-
- for (int i = 0; i< 10; i++)
+
+ for (int i = 0; i < 10; i++)
{
message = session.createClientMessage(true);
message.setBody(bodyLocal);
message.putIntProperty(new SimpleString("id"), i);
-
+
// Consume messages to force an eventual out of order delivery
if (i == 5)
{
@@ -282,13 +276,12 @@
msg.acknowledge();
assertNotNull(msg);
}
-
-
+
assertNull(consumer.receive(100));
consumer.close();
}
-
- Integer messageID = (Integer) message.getProperty(new SimpleString("id"));
+
+ Integer messageID = (Integer)message.getProperty(new SimpleString("id"));
assertNotNull(messageID);
assertEquals(messageID.intValue(), i);
@@ -298,9 +291,9 @@
ClientConsumer consumer = session.createConsumer(ADDRESS);
assertNull(consumer.receive(100));
-
+
sessionTransacted.commit();
-
+
sessionTransacted.close();
for (int i = 0; i < 10; i++)
@@ -308,15 +301,15 @@
message = consumer.receive(10000);
assertNotNull(message);
-
- Integer messageID = (Integer) message.getProperty(new SimpleString("id"));
-
+
+ Integer messageID = (Integer)message.getProperty(new SimpleString("id"));
+
assertNotNull(messageID);
assertEquals("message received out of order", messageID.intValue(), i);
message.acknowledge();
}
-
+
assertNull(consumer.receive(100));
consumer.close();
@@ -339,21 +332,15 @@
}
-
-
-
public void testPageOnSchedulingNoRestart() throws Exception
{
internalTestPageOnScheduling(false);
}
-
-
public void testPageOnSchedulingRestart() throws Exception
{
internalTestPageOnScheduling(true);
}
-
public void internalTestPageOnScheduling(final boolean restart) throws Exception
{
@@ -391,7 +378,7 @@
ClientMessage message = null;
MessagingBuffer body = null;
-
+
long scheduledTime = System.currentTimeMillis() + 5000;
for (int i = 0; i < numberOfMessages; i++)
@@ -411,26 +398,25 @@
message = session.createClientMessage(true);
message.setBody(bodyLocal);
message.putIntProperty(new SimpleString("id"), i);
-
+
// Worse scenario possible... only schedule what's on pages
if (messagingService.getServer().getPostOffice().getPagingManager().isPaging(ADDRESS))
{
message.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, scheduledTime);
}
-
producer.send(message);
}
if (restart)
{
session.close();
-
+
messagingService.stop();
-
+
messagingService = createService(true, config, new HashMap<String, QueueSettings>());
messagingService.start();
-
+
sf = createInVMFactory();
session = sf.createSession(null, null, false, true, true, false, 0);
@@ -449,7 +435,7 @@
message2.acknowledge();
assertNotNull(message2);
-
+
Long scheduled = (Long)message2.getProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME);
if (scheduled != null)
{
@@ -488,7 +474,6 @@
}
-
public void testRollbackOnSend() throws Exception
{
clearData();
@@ -520,9 +505,8 @@
ClientProducer producer = session.createProducer(ADDRESS);
-
long initialSize = messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize();
-
+
ByteBuffer ioBuffer = ByteBuffer.allocate(DataConstants.SIZE_INT * numberOfIntegers);
ClientMessage message = null;
@@ -543,18 +527,17 @@
producer.send(message);
}
-
+
session.rollback();
-
ClientConsumer consumer = session.createConsumer(ADDRESS);
session.start();
-
+
assertNull(consumer.receive(500));
-
+
session.close();
-
+
assertEquals(initialSize, messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize());
}
finally
@@ -570,8 +553,6 @@
}
-
-
public void testCommitOnSend() throws Exception
{
clearData();
@@ -603,9 +584,8 @@
ClientProducer producer = session.createProducer(ADDRESS);
-
long initialSize = messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize();
-
+
ByteBuffer ioBuffer = ByteBuffer.allocate(DataConstants.SIZE_INT * numberOfIntegers);
ClientMessage message = null;
@@ -626,7 +606,7 @@
producer.send(message);
}
-
+
session.commit();
ClientConsumer consumer = session.createConsumer(ADDRESS);
@@ -638,12 +618,11 @@
assertNotNull(msg);
msg.acknowledge();
}
-
-
+
session.commit();
-
+
session.close();
-
+
assertEquals(initialSize, messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize());
}
finally
@@ -659,6 +638,124 @@
}
+
+ public void testPageMultipleDestinations() throws Exception
+ {
+ clearData();
+
+ Configuration config = createDefaultConfig();
+
+ final int MAX_SIZE = 90 * 1024; // this must be lower than minlargeMessageSize on the SessionFactory
+
+ final int NUMBER_OF_BINDINGS = 100;
+
+ int NUMBER_OF_MESSAGES = 2;
+
+ config.setPagingMaxGlobalSizeBytes(100 * 1024);
+ config.setPagingDefaultSize(10 * 1024);
+
+ MessagingService messagingService = createService(true, config, new HashMap<String, QueueSettings>());
+
+ messagingService.start();
+
+ try
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ sf.setBlockOnNonPersistentSend(true);
+ sf.setBlockOnPersistentSend(true);
+ sf.setBlockOnAcknowledge(true);
+
+ ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
+
+ for (int i = 0; i < NUMBER_OF_BINDINGS; i++)
+ {
+ session.createQueue(ADDRESS, new SimpleString("someQueue" + i), null, true, false);
+ }
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ ByteBuffer ioBuffer = ByteBuffer.allocate(MAX_SIZE - 1024); // A single message with almost maxPageSize
+
+ ClientMessage message = null;
+
+ MessagingBuffer bodyLocal = new ByteBufferWrapper(ioBuffer);
+
+ message = session.createClientMessage(true);
+ message.setBody(bodyLocal);
+
+ for (int i = 0; i < NUMBER_OF_MESSAGES; i++)
+ {
+ producer.send(message);
+ }
+
+ session.close();
+
+ messagingService.stop();
+
+ messagingService = createService(true, config, new HashMap<String, QueueSettings>());
+ messagingService.start();
+
+ sf = createInVMFactory();
+
+ assertTrue(messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize() > 0);
+
+ session = sf.createSession(null, null, false, true, true, false, 0);
+
+ session.start();
+
+ for (int msg = 0; msg < NUMBER_OF_MESSAGES; msg++)
+ {
+
+ for (int i = 0; i < NUMBER_OF_BINDINGS; i++)
+ {
+ ClientConsumer consumer = session.createConsumer(new SimpleString("someQueue" + i));
+
+ ClientMessage message2 = consumer.receive(1000);
+
+ assertNotNull(message2);
+
+ message2.acknowledge();
+
+ assertNotNull(message2);
+
+ consumer.close();
+
+ }
+ }
+
+ session.close();
+
+ for (int i = 0; i < NUMBER_OF_BINDINGS; i++)
+ {
+ Queue queue = (Queue)messagingService.getServer()
+ .getPostOffice()
+ .getBinding(new SimpleString("someQueue" + i))
+ .getBindable();
+
+ assertEquals("Queue someQueue" + i + " was supposed to be empty", 0, queue.getMessageCount());
+ assertEquals("Queue someQueue" + i + " was supposed to be empty", 0, queue.getDeliveringCount());
+ }
+
+ assertEquals("There are pending messages on the server", 0, messagingService.getServer()
+ .getPostOffice()
+ .getPagingManager()
+ .getGlobalSize());
+
+ }
+ finally
+ {
+ try
+ {
+ messagingService.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
More information about the jboss-cvs-commits
mailing list