Author: clebert.suconic(a)jboss.com
Date: 2011-12-23 09:14:45 -0500 (Fri, 23 Dec 2011)
New Revision: 11939
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
Backporting JBPAPP-7817
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2011-12-23
14:05:26 UTC (rev 11938)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2011-12-23
14:14:45 UTC (rev 11939)
@@ -58,7 +58,7 @@
void scheduleCleanupCheck();
- void cleanupEntries() throws Exception;
+ void cleanupEntries(boolean completeDelete) throws Exception;
void disableAutoCleanup();
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java 2011-12-23
14:05:26 UTC (rev 11938)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java 2011-12-23
14:14:45 UTC (rev 11939)
@@ -44,5 +44,8 @@
* @param variance
*/
void addInc(long id, int variance);
+
+ // used when deleting the counter
+ void delete() throws Exception;
}
\ No newline at end of file
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java 2011-12-23
14:05:26 UTC (rev 11938)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java 2011-12-23
14:14:45 UTC (rev 11939)
@@ -175,6 +175,25 @@
}
}
+
+ public void delete() throws Exception
+ {
+ synchronized (this)
+ {
+ long tx = storage.generateUniqueID();
+ for (Long record : incrementRecords)
+ {
+ storage.deleteIncrementRecord(tx, record.longValue());
+ }
+
+ if (recordID >= 0)
+ {
+ storage.deletePageCounter(tx, this.recordID);
+ }
+
+ storage.commit(tx);
+ }
+ }
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageSubscriptionCounter#loadInc(long, int)
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-12-23
14:05:26 UTC (rev 11938)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-12-23
14:14:45 UTC (rev 11939)
@@ -201,7 +201,7 @@
{
try
{
- cleanupEntries();
+ cleanupEntries(false);
}
catch (Exception e)
{
@@ -215,8 +215,12 @@
/**
* It will cleanup all the records for completed pages
* */
- public void cleanupEntries() throws Exception
+ public void cleanupEntries(final boolean completeDelete) throws Exception
{
+ if (completeDelete)
+ {
+ counter.delete();
+ }
Transaction tx = new TransactionImpl(store);
boolean persist = false;
@@ -292,7 +296,10 @@
}
}
- cursorProvider.scheduleCleanup();
+ if (!completeDelete)
+ {
+ cursorProvider.scheduleCleanup();
+ }
}
});
}
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-12-23
14:05:26 UTC (rev 11938)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-12-23
14:14:45 UTC (rev 11939)
@@ -1084,7 +1084,8 @@
}
else
{
- log.warn("Can't find queue " + encoding.queueID + "
while reloading ACKNOWLEDGE_CURSOR");
+ log.debug("Can't find queue " + encoding.queueID + "
while reloading ACKNOWLEDGE_CURSOR");
+ messageJournal.appendDeleteRecord(record.id, false);
}
break;
@@ -1103,7 +1104,8 @@
}
else
{
- log.warn("Can't find queue " + encoding.queueID + "
while reloading ACKNOWLEDGE_CURSOR");
+ log.debug("Can't find queue " + encoding.queueID + "
while reloading PAGE_CURSOR_COUNTER_VALUE");
+ messageJournal.appendDeleteRecord(record.id, false);
}
break;
@@ -1123,7 +1125,8 @@
}
else
{
- log.warn("Can't find queue " + encoding.queueID + "
while reloading ACKNOWLEDGE_CURSOR");
+ log.debug("Can't find queue " + encoding.queueID + "
while reloading PAGE_CURSOR_COUNTER_INC");
+ messageJournal.appendDeleteRecord(record.id, false);
}
break;
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-12-23
14:05:26 UTC (rev 11938)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-12-23
14:14:45 UTC (rev 11939)
@@ -1088,11 +1088,6 @@
}
Queue queue = (Queue)binding.getBindable();
-
- if (queue.getPageSubscription() != null)
- {
- queue.getPageSubscription().close();
- }
if (queue.getConsumerCount() != 0)
{
@@ -1116,14 +1111,27 @@
}
}
+ postOffice.removeBinding(queueName);
+
queue.deleteAllReferences();
if (queue.isDurable())
{
storageManager.deleteQueueBinding(queue.getID());
}
+
- postOffice.removeBinding(queueName);
+ if (queue.getPageSubscription() != null)
+ {
+ queue.getPageSubscription().close();
+ }
+
+ PageSubscription subs = queue.getPageSubscription();
+
+ if (subs != null)
+ {
+ subs.cleanupEntries(true);
+ }
}
public synchronized void registerActivateCallback(final ActivateCallback callback)
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
---
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-12-23
14:05:26 UTC (rev 11938)
+++
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-12-23
14:14:45 UTC (rev 11939)
@@ -24,6 +24,7 @@
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.transaction.xa.XAResource;
@@ -121,6 +122,8 @@
{
locator.close();
+ locator = null;
+
super.tearDown();
}
@@ -644,8 +647,7 @@
PagingTest.PAGE_MAX,
new HashMap<String, AddressSettings>());
server.start();
-
-
+
locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
@@ -659,7 +661,7 @@
session = sf.createSession(false, false, false);
producer = session.createProducer(PagingTest.ADDRESS);
-
+
for (int i = 0; i < numberOfMessages; i++)
{
message = session.createMessage(true);
@@ -676,9 +678,9 @@
session.commit();
}
}
-
+
session.commit();
-
+
server.stop();
server = createServer(true,
@@ -693,7 +695,7 @@
queue = server.locateQueue(ADDRESS);
- // assertEquals(numberOfMessages, queue.getMessageCount());
+ // assertEquals(numberOfMessages, queue.getMessageCount());
xids = new LinkedList<Xid>();
@@ -725,7 +727,6 @@
sessionConsumer.close();
-
}
finally
{
@@ -1038,50 +1039,56 @@
server.start();
- ServerLocator locator = createInVMNonHALocator();
+ try
+ {
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setBlockOnAcknowledge(true);
+ ServerLocator locator = createInVMNonHALocator();
- ClientSessionFactory csf = locator.createSessionFactory();
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
- ClientSession session = csf.createSession();
+ ClientSessionFactory csf = locator.createSessionFactory();
- session.start();
+ ClientSession session = csf.createSession();
- for (int i = 1; i <= 2; i++)
- {
- ClientConsumer cons = session.createConsumer("q" + i);
+ session.start();
- for (int j = 3; j < 6; j++)
+ for (int i = 1; i <= 2; i++)
{
- ClientMessage msg = cons.receive(5000);
+ ClientConsumer cons = session.createConsumer("q" + i);
- assertNotNull(msg);
+ for (int j = 3; j < 6; j++)
+ {
+ ClientMessage msg = cons.receive(5000);
- assertEquals("str-" + j, msg.getStringProperty("id"));
+ assertNotNull(msg);
- msg.acknowledge();
+ assertEquals("str-" + j,
msg.getStringProperty("id"));
+
+ msg.acknowledge();
+ }
+
+ session.commit();
+ assertNull(cons.receive(500));
+
}
- session.commit();
- assertNull(cons.receive(500));
+ session.close();
- }
+ long timeout = System.currentTimeMillis() + 5000;
- session.close();
+ while (System.currentTimeMillis() < timeout &&
server.getPagingManager().getPageStore(ADDRESS).isPaging())
+ {
+ Thread.sleep(100);
+ }
- long timeout = System.currentTimeMillis() + 5000;
-
- while (System.currentTimeMillis() < timeout &&
server.getPagingManager().getPageStore(ADDRESS).isPaging())
+ locator.close();
+ }
+ finally
{
- Thread.sleep(100);
+ server.stop();
}
-
- locator.close();
-
- server.stop();
}
public void testTwoQueuesOneNoRouting() throws Exception
@@ -1275,6 +1282,40 @@
bb.put(getSamplebyte(j));
}
+ final AtomicBoolean running = new AtomicBoolean(true);
+
+ class TCount extends Thread
+ {
+ Queue queue;
+
+ TCount(Queue queue)
+ {
+ this.queue = queue;
+ }
+
+ public void run()
+ {
+ try
+ {
+ while (running.get())
+ {
+ // log.info("Message count = " + queue.getMessageCount() +
" on queue " + queue.getName());
+ queue.getMessagesAdded();
+ queue.getMessageCount();
+ // log.info("Message added = " + queue.getMessagesAdded() +
" on queue " + queue.getName());
+ Thread.sleep(10);
+ }
+ }
+ catch (InterruptedException e)
+ {
+ log.info("Thread interrupted");
+ }
+ }
+ };
+
+ TCount tcount1 = null;
+ TCount tcount2 = null;
+
try
{
{
@@ -1309,6 +1350,7 @@
{
if (i % 500 == 0)
{
+ log.info("Sent " + i + " messages");
session.commit();
}
message = session.createMessage(true);
@@ -1339,6 +1381,23 @@
new HashMap<String, AddressSettings>());
server.start();
+ Queue queue1 = server.locateQueue(PagingTest.ADDRESS.concat("-1"));
+
+ Queue queue2 = server.locateQueue(PagingTest.ADDRESS.concat("-2"));
+
+ assertNotNull(queue1);
+
+ assertNotNull(queue2);
+
+ assertNotSame(queue1, queue2);
+
+ tcount1 = new TCount(queue1);
+
+ tcount2 = new TCount(queue2);
+
+ tcount1.start();
+ tcount2.start();
+
ServerLocator locator = createInVMNonHALocator();
final ClientSessionFactory sf2 = locator.createSessionFactory();
@@ -1375,8 +1434,14 @@
Assert.assertNotNull(message2);
- if (i % 1000 == 0)
+ if (i % 100 == 0)
+ {
+ if (i % 5000 == 0)
+ {
+ log.info(addressToSubscribe + " consumed " + i +
" messages");
+ }
session.commit();
+ }
try
{
@@ -1437,6 +1502,20 @@
}
finally
{
+ running.set(false);
+
+ if (tcount1 != null)
+ {
+ tcount1.interrupt();
+ tcount1.join();
+ }
+
+ if (tcount2 != null)
+ {
+ tcount2.interrupt();
+ tcount2.join();
+ }
+
try
{
server.stop();
@@ -2434,7 +2513,7 @@
producerThread.start();
- assertTrue(ready.await(10, TimeUnit.SECONDS));
+ assertTrue(ready.await(100, TimeUnit.SECONDS));
ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
@@ -4100,6 +4179,146 @@
}
}
+ public void testTwoQueuesConsumeOneRestart() throws Exception
+ {
+ boolean persistentMessages = true;
+
+ clearData();
+
+ Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
+
+ HornetQServer server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int messageSize = 1024;
+
+ final int numberOfMessages = 1000;
+
+ try
+ {
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setClientFailureCheckPeriod(120000);
+ locator.setConnectionTTL(5000000);
+ locator.setCallTimeout(120000);
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ session.createQueue(PagingTest.ADDRESS,
PagingTest.ADDRESS.concat("=1"), null, true);
+ session.createQueue(PagingTest.ADDRESS,
PagingTest.ADDRESS.concat("=2"), null, true);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ ClientMessage message = null;
+
+ byte[] body = new byte[messageSize];
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = session.createMessage(persistentMessages);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty("propTest", i % 2 == 0 ? 1 : 2);
+
+ producer.send(message);
+ if (i % 1000 == 0)
+ {
+ session.commit();
+ }
+ }
+
+ session.commit();
+
+ session.start();
+
+ // ClientConsumer consumer =
session.createConsumer(PagingTest.ADDRESS.concat("=1"));
+ //
+ // for (int i = 0; i < numberOfMessages; i++)
+ // {
+ // message = consumer.receive(500000);
+ // assertNotNull(message);
+ // message.acknowledge();
+ //
+ // // assertEquals(msg,
message.getIntProperty("propTest").intValue());
+ //
+ // System.out.println("i = " + i + " msg = " +
message.getIntProperty("propTest"));
+ // }
+ //
+ // session.commit();
+
+ // consumer.close();
+
+ session.deleteQueue(PagingTest.ADDRESS.concat("=1"));
+ // server.stop();
+ // server.start();
+
+ sf = locator.createSessionFactory();
+
+ session = sf.createSession(false, false, false);
+
+ session.start();
+
+ ClientConsumer consumer =
session.createConsumer(PagingTest.ADDRESS.concat("=2"));
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = consumer.receive(500000);
+ assertNotNull(message);
+ message.acknowledge();
+
+ // assertEquals(msg,
message.getIntProperty("propTest").intValue());
+
+ System.out.println("i = " + i + " msg = " +
message.getIntProperty("propTest"));
+ }
+
+ session.commit();
+
+ assertNull(consumer.receiveImmediate());
+
+ consumer.close();
+
+ // It's async, so need to wait a bit for it happening
+ assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
+
+ server.stop();
+
+ server.start();
+
+ server.stop();
+ server.start();
+
+ sf.close();
+
+ locator.close();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
public void testDLAOnLargeMessageAndPaging() throws Exception
{
clearData();
@@ -4316,7 +4535,7 @@
pgStoreAddress = server.getPagingManager().getPageStore(ADDRESS);
-
pgStoreAddress.getCursorProvier().getSubscription(server.locateQueue(ADDRESS).getID()).cleanupEntries();
+
pgStoreAddress.getCursorProvier().getSubscription(server.locateQueue(ADDRESS).getID()).cleanupEntries(false);
pgStoreAddress.getCursorProvier().cleanup();
@@ -4455,7 +4674,7 @@
for (int i = 0; i < 500; i++)
{
log.info("Received message " + i);
- message = cons.receive(5000);
+ message = cons.receive(10000);
assertNotNull(message);
message.acknowledge();