Author: clebert.suconic(a)jboss.com
Date: 2011-12-23 09:05:26 -0500 (Fri, 23 Dec 2011)
New Revision: 11938
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
JBPAPP-7817 - removing unreferenced records during deletion of a destination
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2011-12-22
14:40:03 UTC (rev 11937)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2011-12-23
14:05:26 UTC (rev 11938)
@@ -58,7 +58,7 @@
void scheduleCleanupCheck();
- void cleanupEntries() throws Exception;
+ void cleanupEntries(boolean completeDelete) throws Exception;
void disableAutoCleanup();
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java 2011-12-22
14:40:03 UTC (rev 11937)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java 2011-12-23
14:05:26 UTC (rev 11938)
@@ -44,5 +44,8 @@
* @param variance
*/
void addInc(long id, int variance);
+
+ // used when deleting the counter
+ void delete() throws Exception;
}
\ No newline at end of file
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java 2011-12-22
14:40:03 UTC (rev 11937)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java 2011-12-23
14:05:26 UTC (rev 11938)
@@ -175,6 +175,25 @@
}
}
+
+ public void delete() throws Exception
+ {
+ synchronized (this)
+ {
+ long tx = storage.generateUniqueID();
+ for (Long record : incrementRecords)
+ {
+ storage.deleteIncrementRecord(tx, record.longValue());
+ }
+
+ if (recordID >= 0)
+ {
+ storage.deletePageCounter(tx, this.recordID);
+ }
+
+ storage.commit(tx);
+ }
+ }
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageSubscriptionCounter#loadInc(long, int)
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-12-22
14:40:03 UTC (rev 11937)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-12-23
14:05:26 UTC (rev 11938)
@@ -201,7 +201,7 @@
{
try
{
- cleanupEntries();
+ cleanupEntries(false);
}
catch (Exception e)
{
@@ -215,8 +215,12 @@
/**
* It will cleanup all the records for completed pages
* */
- public void cleanupEntries() throws Exception
+ public void cleanupEntries(final boolean completeDelete) throws Exception
{
+ if (completeDelete)
+ {
+ counter.delete();
+ }
Transaction tx = new TransactionImpl(store);
boolean persist = false;
@@ -292,7 +296,10 @@
}
}
- cursorProvider.scheduleCleanup();
+ if (!completeDelete)
+ {
+ cursorProvider.scheduleCleanup();
+ }
}
});
}
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-12-22
14:40:03 UTC (rev 11937)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-12-23
14:05:26 UTC (rev 11938)
@@ -1147,7 +1147,9 @@
}
else
{
- log.warn("Can't find queue " + encoding.queueID + "
while reloading ACKNOWLEDGE_CURSOR");
+ log.info("Can't find queue " + encoding.queueID + "
while reloading ACKNOWLEDGE_CURSOR, deleting record now");
+ messageJournal.appendDeleteRecord(record.id, false);
+
}
break;
@@ -1166,7 +1168,8 @@
}
else
{
- log.warn("Can't find queue " + encoding.queueID + "
while reloading ACKNOWLEDGE_CURSOR");
+ log.info("Can't find queue " + encoding.queueID + "
while reloading PAGE_CURSOR_COUNTER_VALUE, deleting record now");
+ messageJournal.appendDeleteRecord(record.id, false);
}
break;
@@ -1186,7 +1189,8 @@
}
else
{
- log.warn("Can't find queue " + encoding.queueID + "
while reloading ACKNOWLEDGE_CURSOR");
+ log.info("Can't find queue " + encoding.queueID + "
while reloading PAGE_CURSOR_COUNTER_INC, deleting record now");
+ messageJournal.appendDeleteRecord(record.id, false);
}
break;
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-12-22
14:40:03 UTC (rev 11937)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-12-23
14:05:26 UTC (rev 11938)
@@ -977,11 +977,6 @@
Queue queue = (Queue)binding.getBindable();
- if (queue.getPageSubscription() != null)
- {
- queue.getPageSubscription().close();
- }
-
if (queue.getConsumerCount() != 0)
{
throw new HornetQException(HornetQException.ILLEGAL_STATE, "Cannot delete
queue " + queue.getName() +
@@ -1004,14 +999,27 @@
}
}
+ postOffice.removeBinding(queueName);
+
queue.deleteAllReferences();
if (queue.isDurable())
{
storageManager.deleteQueueBinding(queue.getID());
}
+
- postOffice.removeBinding(queueName);
+ if (queue.getPageSubscription() != null)
+ {
+ queue.getPageSubscription().close();
+ }
+
+ PageSubscription subs = queue.getPageSubscription();
+
+ if (subs != null)
+ {
+ subs.cleanupEntries(true);
+ }
}
public synchronized void registerActivateCallback(final ActivateCallback callback)
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-12-22
14:40:03 UTC (rev 11937)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-12-23
14:05:26 UTC (rev 11938)
@@ -121,7 +121,7 @@
protected void tearDown() throws Exception
{
locator.close();
-
+
locator = null;
super.tearDown();
@@ -368,7 +368,6 @@
config.setJournalSyncNonTransactional(false);
-
HornetQServer server = createServer(true,
config,
PagingTest.PAGE_SIZE,
@@ -429,25 +428,25 @@
}
}
session.commit();
-
+
session.start();
-
+
ClientConsumer cons = session.createConsumer(ADDRESS);
-
- for (int i = 0 ; i < numberOfMessages; i++)
+
+ for (int i = 0; i < numberOfMessages; i++)
{
message = cons.receive(5000);
assertNotNull(message);
message.acknowledge();
-
+
if (i % 10 == 0)
{
session.commit();
}
}
-
+
session.commit();
-
+
}
finally
{
@@ -4283,6 +4282,146 @@
}
}
+ public void testTwoQueuesConsumeOneRestart() throws Exception
+ {
+ boolean persistentMessages = true;
+
+ clearData();
+
+ Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
+
+ HornetQServer server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int messageSize = 1024;
+
+ final int numberOfMessages = 1000;
+
+ try
+ {
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setClientFailureCheckPeriod(120000);
+ locator.setConnectionTTL(5000000);
+ locator.setCallTimeout(120000);
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ session.createQueue(PagingTest.ADDRESS,
PagingTest.ADDRESS.concat("=1"), null, true);
+ session.createQueue(PagingTest.ADDRESS,
PagingTest.ADDRESS.concat("=2"), null, true);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ ClientMessage message = null;
+
+ byte[] body = new byte[messageSize];
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = session.createMessage(persistentMessages);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty("propTest", i % 2 == 0 ? 1 : 2);
+
+ producer.send(message);
+ if (i % 1000 == 0)
+ {
+ session.commit();
+ }
+ }
+
+ session.commit();
+
+ session.start();
+
+ // ClientConsumer consumer =
session.createConsumer(PagingTest.ADDRESS.concat("=1"));
+ //
+ // for (int i = 0; i < numberOfMessages; i++)
+ // {
+ // message = consumer.receive(500000);
+ // assertNotNull(message);
+ // message.acknowledge();
+ //
+ // // assertEquals(msg,
message.getIntProperty("propTest").intValue());
+ //
+ // System.out.println("i = " + i + " msg = " +
message.getIntProperty("propTest"));
+ // }
+ //
+ // session.commit();
+
+ // consumer.close();
+
+ session.deleteQueue(PagingTest.ADDRESS.concat("=1"));
+ // server.stop();
+ // server.start();
+
+ sf = locator.createSessionFactory();
+
+ session = sf.createSession(false, false, false);
+
+ session.start();
+
+ ClientConsumer consumer =
session.createConsumer(PagingTest.ADDRESS.concat("=2"));
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = consumer.receive(500000);
+ assertNotNull(message);
+ message.acknowledge();
+
+ // assertEquals(msg,
message.getIntProperty("propTest").intValue());
+
+ System.out.println("i = " + i + " msg = " +
message.getIntProperty("propTest"));
+ }
+
+ session.commit();
+
+ assertNull(consumer.receiveImmediate());
+
+ consumer.close();
+
+ // It's async, so need to wait a bit for it happening
+ assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
+
+ server.stop();
+
+ server.start();
+
+ server.stop();
+ server.start();
+
+ sf.close();
+
+ locator.close();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
public void testDLAOnLargeMessageAndPaging() throws Exception
{
clearData();
@@ -4499,7 +4638,7 @@
pgStoreAddress = server.getPagingManager().getPageStore(ADDRESS);
-
pgStoreAddress.getCursorProvier().getSubscription(server.locateQueue(ADDRESS).getID()).cleanupEntries();
+
pgStoreAddress.getCursorProvier().getSubscription(server.locateQueue(ADDRESS).getID()).cleanupEntries(false);
pgStoreAddress.getCursorProvier().cleanup();