[jboss-cvs] JBoss Messaging SVN: r5714 - in trunk: src/main/org/jboss/messaging/core/paging/impl and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Jan 23 18:48:02 EST 2009
Author: clebert.suconic at jboss.com
Date: 2009-01-23 18:48:02 -0500 (Fri, 23 Jan 2009)
New Revision: 5714
Modified:
trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingServiceIntegrationTest.java
Log:
https://jira.jboss.org/jira/browse/JBMESSAGING-1479 - re-enabling/fix drop messages
Modified: trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java 2009-01-23 22:41:03 UTC (rev 5713)
+++ trunk/src/main/org/jboss/messaging/core/paging/PagingStore.java 2009-01-23 23:48:02 UTC (rev 5714)
@@ -80,5 +80,5 @@
* @return
* @throws Exception
*/
- boolean addSize(long memoryEstimate) throws Exception;
+ void addSize(long memoryEstimate) throws Exception;
}
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2009-01-23 22:41:03 UTC (rev 5713)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java 2009-01-23 23:48:02 UTC (rev 5714)
@@ -204,7 +204,14 @@
currentPageLock.readLock().lock();
try
{
- return currentPage != null;
+ if (isDropWhenMaxSize())
+ {
+ return isDrop();
+ }
+ else
+ {
+ return currentPage != null;
+ }
}
finally
{
@@ -221,35 +228,19 @@
{
return storeName;
}
-
- public boolean addSize(final long size) throws Exception
+
+ public void addSize(final long size) throws Exception
{
final long maxSize = getMaxSizeBytes();
final long pageSize = getPageSizeBytes();
- if (isDropWhenMaxSize() && size > 0)
+ if (isDropWhenMaxSize())
{
- // if destination configured to drop messages && size is over the
- // limit, we return -1 which means drop the message
- if (getAddressSize() + size > maxSize || pagingManager.getMaxGlobalSize() > 0 &&
- pagingManager.getGlobalSize() + size > pagingManager.getMaxGlobalSize())
- {
- if (!printedDropMessagesWarning)
- {
- printedDropMessagesWarning = true;
+ addAddressSize(size);
+ pagingManager.addGlobalSize(size);
- log.warn("Messages are being dropped on adress " + getStoreName());
- }
-
- return false;
- }
- else
- {
- addAddressSize(size);
-
- return true;
- }
+ return;
}
else
{
@@ -303,7 +294,8 @@
(maxGlobalSize - pagingManager.getDefaultPageSize()));
}
- if (maxGlobalSize > 0 && pagingManager.isGlobalPageMode() && currentGlobalSize < maxGlobalSize - pagingManager.getDefaultPageSize())
+ if (maxGlobalSize > 0 && pagingManager.isGlobalPageMode() &&
+ currentGlobalSize < maxGlobalSize - pagingManager.getDefaultPageSize())
{
pagingManager.startGlobalDepage();
}
@@ -316,7 +308,7 @@
}
}
- return true;
+ return;
}
}
@@ -328,10 +320,24 @@
throw new IllegalStateException("PagingStore(" + getStoreName() + ") not initialized");
}
- // We should never page when drop-messages is activated.
if (dropMessagesWhenFull)
{
- return false;
+ if (isDrop())
+ {
+ if (!printedDropMessagesWarning)
+ {
+ printedDropMessagesWarning = true;
+
+ log.warn("Messages are being dropped on adress " + getStoreName());
+ }
+
+ // Address is full, we just pretend we are paging, and drop the data
+ return true;
+ }
+ else
+ {
+ return false;
+ }
}
// We need to ensure a read lock, as depage could change the paging state
@@ -455,7 +461,7 @@
{
return false;
}
-
+
if (pagingManager.isBackup())
{
return false;
@@ -820,29 +826,29 @@
// back to where it was
Transaction depageTransaction = new TransactionImpl(storageManager);
-
+
SendLock sendLock = postOffice.getAddressLock(destination);
-
+
sendLock.beforeSend();
-
+
try
{
depageTransaction.putProperty(TransactionPropertyIndexes.IS_DEPAGE, Boolean.valueOf(true));
-
+
HashSet<PageTransactionInfo> pageTransactionsToUpdate = new HashSet<PageTransactionInfo>();
-
+
for (PagedMessage pagedMessage : pagedMessages)
{
ServerMessage message = null;
-
+
message = pagedMessage.getMessage(storageManager);
-
+
final long transactionIdDuringPaging = pagedMessage.getTransactionID();
-
+
if (transactionIdDuringPaging >= 0)
{
final PageTransactionInfo pageTransactionInfo = pagingManager.getTransaction(transactionIdDuringPaging);
-
+
// http://wiki.jboss.org/wiki/JBossMessaging2Paging
// This is the Step D described on the "Transactions on Paging"
// section
@@ -851,28 +857,29 @@
log.warn("Transaction " + pagedMessage.getTransactionID() +
" used during paging not found, ignoring message " +
message);
-
+
continue;
}
-
+
// This is to avoid a race condition where messages are depaged
// before the commit arrived
-
+
while (running && !pageTransactionInfo.waitCompletion(500))
{
// This is just to give us a chance to interrupt the process..
- // if we start a shutdown in the middle of transactions, the commit/rollback may never come, delaying the shutdown of the server
+ // if we start a shutdown in the middle of transactions, the commit/rollback may never come, delaying
+ // the shutdown of the server
if (isTrace)
{
trace("Waiting pageTransaction to complete");
}
}
-
+
if (!running)
{
break;
}
-
+
if (!pageTransactionInfo.isCommit())
{
if (isTrace)
@@ -881,7 +888,7 @@
}
continue;
}
-
+
// Update information about transactions
if (message.isDurable())
{
@@ -889,21 +896,21 @@
pageTransactionsToUpdate.add(pageTransactionInfo);
}
}
-
+
postOffice.route(message, depageTransaction);
}
-
+
if (!running)
{
depageTransaction.rollback();
return false;
}
-
+
for (PageTransactionInfo pageWithTransaction : pageTransactionsToUpdate)
{
// This will set the journal transaction to commit;
depageTransaction.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
-
+
if (pageWithTransaction.getNumberOfMessages() == 0)
{
// http://wiki.jboss.org/wiki/JBossMessaging2Paging
@@ -916,9 +923,9 @@
storageManager.storePageTransaction(depageTransaction.getID(), pageWithTransaction);
}
}
-
+
depageTransaction.commit();
-
+
if (isTrace)
{
trace("Depage committed, running = " + running);
@@ -928,7 +935,7 @@
{
sendLock.afterSend();
}
-
+
return true;
}
@@ -1027,6 +1034,14 @@
return Integer.parseInt(fileName.substring(0, fileName.indexOf('.')));
}
+ // To be used on isDropMessagesWhenFull
+ private boolean isDrop()
+ {
+ return (getMaxSizeBytes() > 0 && getAddressSize() > getMaxSizeBytes()) ||
+ (pagingManager.getMaxGlobalSize() > 0 && pagingManager.getGlobalSize() > pagingManager.getMaxGlobalSize());
+ }
+
+
// Inner classes -------------------------------------------------
private class DepageRunnable implements Runnable
@@ -1050,7 +1065,8 @@
}
// Note: clearDepage is an atomic operation, it needs to be done even if readPage was not executed
- // however clearDepage shouldn't be executed if the page-store is being stopped, as stop will be holding the lock and this would dead lock
+ // however clearDepage shouldn't be executed if the page-store is being stopped, as stop will be holding
+ // the lock and this would dead lock
if (running && !clearDepage())
{
followingExecutor.execute(this);
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java 2009-01-23 22:41:03 UTC (rev 5713)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingManagerIntegrationTest.java 2009-01-23 23:48:02 UTC (rev 5714)
@@ -107,48 +107,6 @@
}
- public void testPagingManagerAddressFull() throws Exception
- {
- HierarchicalRepository<QueueSettings> queueSettings = new HierarchicalObjectRepository<QueueSettings>();
- queueSettings.setDefault(new QueueSettings());
-
- QueueSettings simpleTestSettings = new QueueSettings();
- simpleTestSettings.setDropMessagesWhenFull(true);
- simpleTestSettings.setMaxSizeBytes(200);
-
- queueSettings.addMatch("simple-test", simpleTestSettings);
-
- PagingManagerImpl managerImpl = new PagingManagerImpl(new PagingStoreFactoryNIO(getJournalDir(), 10),
- null,
- queueSettings,
- -1,
- 1024 * 1024,
- false,
- false);
- managerImpl.start();
-
- managerImpl.createPageStore(new SimpleString("simple-test"));
-
- ServerMessage msg = createMessage(1l, new SimpleString("simple-test"), createRandomBuffer(100));
-
- assertTrue(managerImpl.getPageStore(msg.getDestination()).addSize(msg.getMemoryEstimate()));
-
- for (int i = 0; i < 10; i++)
- {
- long currentSize = managerImpl.getPageStore(new SimpleString("simple-test")).getAddressSize();
- assertFalse(managerImpl.getPageStore(msg.getDestination()).addSize(msg.getMemoryEstimate()));
-
- // should be unchanged
- assertEquals(currentSize, managerImpl.getPageStore(new SimpleString("simple-test")).getAddressSize());
- }
-
- managerImpl.getPageStore(msg.getDestination()).addSize(-msg.getMemoryEstimate());
-
- assertTrue(managerImpl.getPageStore(msg.getDestination()).addSize(msg.getMemoryEstimate()));
-
- managerImpl.stop();
- }
-
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingServiceIntegrationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingServiceIntegrationTest.java 2009-01-23 22:41:03 UTC (rev 5713)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/paging/PagingServiceIntegrationTest.java 2009-01-23 23:48:02 UTC (rev 5714)
@@ -74,6 +74,7 @@
{
super.tearDown();
}
+
public void testSendReceivePaging() throws Exception
{
clearData();
@@ -637,18 +638,197 @@
}
}
-
+
public void testPageMultipleDestinations() throws Exception
{
internalTestPageMultipleDestinations(false);
}
-
public void testPageMultipleDestinationsTransacted() throws Exception
{
internalTestPageMultipleDestinations(true);
}
-
+
+ public void testDropMessagesQueueMax() throws Exception
+ {
+ testDropMessages(false);
+ }
+
+ public void testDropMessagesGlobalMax() throws Exception
+ {
+ testDropMessages(true);
+ }
+
+ private void testDropMessages(boolean global) throws Exception
+ {
+ clearData();
+
+ Configuration config = createDefaultConfig();
+
+ if (global)
+ {
+ config.setPagingMaxGlobalSizeBytes(10 * 1024);
+ }
+ else
+ {
+ config.setPagingMaxGlobalSizeBytes(-1);
+ }
+
+ config.setPagingDefaultSize(10 * 1024);
+
+ HashMap<String, QueueSettings> settings = new HashMap<String, QueueSettings>();
+
+ QueueSettings set = new QueueSettings();
+ set.setDropMessagesWhenFull(true);
+ if (!global)
+ {
+ set.setMaxSizeBytes(10 * 1024);
+ }
+ else
+ {
+ set.setMaxSizeBytes(-1);
+
+ }
+ settings.put(ADDRESS.toString(), set);
+
+ MessagingService messagingService = createService(true, config, settings);
+
+ messagingService.start();
+
+ final int sizeOfMessage = 1024;
+ final int numberOfMessages = 1000;
+
+ try
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ sf.setBlockOnNonPersistentSend(true);
+ sf.setBlockOnPersistentSend(true);
+ sf.setBlockOnAcknowledge(true);
+
+ ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
+
+ session.createQueue(ADDRESS, ADDRESS, null, true, false);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ ByteBuffer ioBuffer = ByteBuffer.allocate(sizeOfMessage);
+
+ ClientMessage message = null;
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ MessagingBuffer bodyLocal = new ByteBufferWrapper(ioBuffer);
+
+ message = session.createClientMessage(true);
+ message.setBody(bodyLocal);
+
+ producer.send(message);
+ }
+
+ session = sf.createSession(null, null, false, true, true, false, 0);
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ session.start();
+
+ for (int i = 0; i < 9; i++)
+ {
+ ClientMessage message2 = consumer.receive(10000);
+
+ assertNotNull(message2);
+
+ message2.acknowledge();
+ }
+
+ assertNull(consumer.receive(100));
+
+ assertEquals(0, messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize());
+ assertEquals(0, messagingService.getServer()
+ .getPostOffice()
+ .getPagingManager()
+ .getPageStore(ADDRESS)
+ .getAddressSize());
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ MessagingBuffer bodyLocal = new ByteBufferWrapper(ioBuffer);
+
+ message = session.createClientMessage(true);
+ message.setBody(bodyLocal);
+
+ producer.send(message);
+ }
+
+ for (int i = 0; i < 9; i++)
+ {
+ ClientMessage message2 = consumer.receive(10000);
+
+ assertNotNull(message2);
+
+ message2.acknowledge();
+ }
+
+ assertNull(consumer.receive(100));
+
+ session.close();
+
+ session = sf.createSession(false, true, true);
+
+ producer = session.createProducer(ADDRESS);
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ MessagingBuffer bodyLocal = new ByteBufferWrapper(ioBuffer);
+
+ message = session.createClientMessage(true);
+ message.setBody(bodyLocal);
+
+ producer.send(message);
+ }
+
+ session.commit();
+
+ consumer = session.createConsumer(ADDRESS);
+
+ session.start();
+
+ for (int i = 0; i < 9; i++)
+ {
+ ClientMessage message2 = consumer.receive(10000);
+
+ assertNotNull(message2);
+
+ message2.acknowledge();
+ }
+
+ session.commit();
+
+ assertNull(consumer.receive(100));
+
+ session.close();
+
+ assertEquals(0, messagingService.getServer().getPostOffice().getPagingManager().getGlobalSize());
+ assertEquals(0, messagingService.getServer()
+ .getPostOffice()
+ .getPagingManager()
+ .getPageStore(ADDRESS)
+ .getAddressSize());
+
+ }
+ finally
+ {
+ try
+ {
+ messagingService.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
private void internalTestPageMultipleDestinations(boolean transacted) throws Exception
{
Configuration config = createDefaultConfig();
@@ -695,8 +875,8 @@
for (int i = 0; i < NUMBER_OF_MESSAGES; i++)
{
producer.send(message);
-
- if (transacted)
+
+ if (transacted)
{
session.commit();
}
More information about the jboss-cvs-commits
mailing list