JBoss hornetq SVN: r11942 - branches/Branch_2_2_EAP/src/config/common.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-12-23 11:54:27 -0500 (Fri, 23 Dec 2011)
New Revision: 11942
Modified:
branches/Branch_2_2_EAP/src/config/common/hornetq-version.properties
Log:
version change
Modified: branches/Branch_2_2_EAP/src/config/common/hornetq-version.properties
===================================================================
--- branches/Branch_2_2_EAP/src/config/common/hornetq-version.properties 2011-12-23 15:40:21 UTC (rev 11941)
+++ branches/Branch_2_2_EAP/src/config/common/hornetq-version.properties 2011-12-23 16:54:27 UTC (rev 11942)
@@ -1,7 +1,7 @@
hornetq.version.versionName=HQ_2_2_10_EAP_GA
hornetq.version.majorVersion=2
hornetq.version.minorVersion=2
-hornetq.version.microVersion=8
+hornetq.version.microVersion=10
hornetq.version.incrementingVersion=122
hornetq.version.versionSuffix=GA
hornetq.version.versionTag=GA
13 years
JBoss hornetq SVN: r11941 - branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-12-23 10:40:21 -0500 (Fri, 23 Dec 2011)
New Revision: 11941
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
Log:
Backporting JBPAPP-7817
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java 2011-12-23 15:39:33 UTC (rev 11940)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java 2011-12-23 15:40:21 UTC (rev 11941)
@@ -181,17 +181,24 @@
synchronized (this)
{
long tx = storage.generateUniqueID();
+
+ boolean txUsed = false;
for (Long record : incrementRecords)
{
+ txUsed = true;
storage.deleteIncrementRecord(tx, record.longValue());
}
if (recordID >= 0)
{
+ txUsed = true;
storage.deletePageCounter(tx, this.recordID);
}
- storage.commit(tx);
+ if (txUsed)
+ {
+ storage.commit(tx);
+ }
}
}
13 years
JBoss hornetq SVN: r11940 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-12-23 10:39:33 -0500 (Fri, 23 Dec 2011)
New Revision: 11940
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
Log:
JBPAPP-7817 - removing unreferenced records during deletion of a destination
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java 2011-12-23 14:14:45 UTC (rev 11939)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java 2011-12-23 15:39:33 UTC (rev 11940)
@@ -181,17 +181,24 @@
synchronized (this)
{
long tx = storage.generateUniqueID();
+
+ boolean txUsed = false;
for (Long record : incrementRecords)
{
+ txUsed = true;
storage.deleteIncrementRecord(tx, record.longValue());
}
if (recordID >= 0)
{
+ txUsed = true;
storage.deletePageCounter(tx, this.recordID);
}
- storage.commit(tx);
+ if (txUsed)
+ {
+ storage.commit(tx);
+ }
}
}
13 years
JBoss hornetq SVN: r11939 - in branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710: src/main/org/hornetq/core/paging/cursor/impl and 3 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-12-23 09:14:45 -0500 (Fri, 23 Dec 2011)
New Revision: 11939
Modified:
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
Backporting JBPAPP-7817
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2011-12-23 14:05:26 UTC (rev 11938)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2011-12-23 14:14:45 UTC (rev 11939)
@@ -58,7 +58,7 @@
void scheduleCleanupCheck();
- void cleanupEntries() throws Exception;
+ void cleanupEntries(boolean completeDelete) throws Exception;
void disableAutoCleanup();
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java 2011-12-23 14:05:26 UTC (rev 11938)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java 2011-12-23 14:14:45 UTC (rev 11939)
@@ -44,5 +44,8 @@
* @param variance
*/
void addInc(long id, int variance);
+
+ // used when deleting the counter
+ void delete() throws Exception;
}
\ No newline at end of file
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java 2011-12-23 14:05:26 UTC (rev 11938)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java 2011-12-23 14:14:45 UTC (rev 11939)
@@ -175,6 +175,25 @@
}
}
+
+ public void delete() throws Exception
+ {
+ synchronized (this)
+ {
+ long tx = storage.generateUniqueID();
+ for (Long record : incrementRecords)
+ {
+ storage.deleteIncrementRecord(tx, record.longValue());
+ }
+
+ if (recordID >= 0)
+ {
+ storage.deletePageCounter(tx, this.recordID);
+ }
+
+ storage.commit(tx);
+ }
+ }
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageSubscriptionCounter#loadInc(long, int)
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-12-23 14:05:26 UTC (rev 11938)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-12-23 14:14:45 UTC (rev 11939)
@@ -201,7 +201,7 @@
{
try
{
- cleanupEntries();
+ cleanupEntries(false);
}
catch (Exception e)
{
@@ -215,8 +215,12 @@
/**
* It will cleanup all the records for completed pages
* */
- public void cleanupEntries() throws Exception
+ public void cleanupEntries(final boolean completeDelete) throws Exception
{
+ if (completeDelete)
+ {
+ counter.delete();
+ }
Transaction tx = new TransactionImpl(store);
boolean persist = false;
@@ -292,7 +296,10 @@
}
}
- cursorProvider.scheduleCleanup();
+ if (!completeDelete)
+ {
+ cursorProvider.scheduleCleanup();
+ }
}
});
}
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-12-23 14:05:26 UTC (rev 11938)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-12-23 14:14:45 UTC (rev 11939)
@@ -1084,7 +1084,8 @@
}
else
{
- log.warn("Can't find queue " + encoding.queueID + " while reloading ACKNOWLEDGE_CURSOR");
+ log.debug("Can't find queue " + encoding.queueID + " while reloading ACKNOWLEDGE_CURSOR");
+ messageJournal.appendDeleteRecord(record.id, false);
}
break;
@@ -1103,7 +1104,8 @@
}
else
{
- log.warn("Can't find queue " + encoding.queueID + " while reloading ACKNOWLEDGE_CURSOR");
+ log.debug("Can't find queue " + encoding.queueID + " while reloading PAGE_CURSOR_COUNTER_VALUE");
+ messageJournal.appendDeleteRecord(record.id, false);
}
break;
@@ -1123,7 +1125,8 @@
}
else
{
- log.warn("Can't find queue " + encoding.queueID + " while reloading ACKNOWLEDGE_CURSOR");
+ log.debug("Can't find queue " + encoding.queueID + " while reloading PAGE_CURSOR_COUNTER_INC");
+ messageJournal.appendDeleteRecord(record.id, false);
}
break;
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-12-23 14:05:26 UTC (rev 11938)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-12-23 14:14:45 UTC (rev 11939)
@@ -1088,11 +1088,6 @@
}
Queue queue = (Queue)binding.getBindable();
-
- if (queue.getPageSubscription() != null)
- {
- queue.getPageSubscription().close();
- }
if (queue.getConsumerCount() != 0)
{
@@ -1116,14 +1111,27 @@
}
}
+ postOffice.removeBinding(queueName);
+
queue.deleteAllReferences();
if (queue.isDurable())
{
storageManager.deleteQueueBinding(queue.getID());
}
+
- postOffice.removeBinding(queueName);
+ if (queue.getPageSubscription() != null)
+ {
+ queue.getPageSubscription().close();
+ }
+
+ PageSubscription subs = queue.getPageSubscription();
+
+ if (subs != null)
+ {
+ subs.cleanupEntries(true);
+ }
}
public synchronized void registerActivateCallback(final ActivateCallback callback)
Modified: branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-12-23 14:05:26 UTC (rev 11938)
+++ branches/one-offs/HornetQ_2_2_5_EAP_GA_JBPAPP_7710/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-12-23 14:14:45 UTC (rev 11939)
@@ -24,6 +24,7 @@
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.transaction.xa.XAResource;
@@ -121,6 +122,8 @@
{
locator.close();
+ locator = null;
+
super.tearDown();
}
@@ -644,8 +647,7 @@
PagingTest.PAGE_MAX,
new HashMap<String, AddressSettings>());
server.start();
-
-
+
locator = createInVMNonHALocator();
locator.setBlockOnNonDurableSend(true);
locator.setBlockOnDurableSend(true);
@@ -659,7 +661,7 @@
session = sf.createSession(false, false, false);
producer = session.createProducer(PagingTest.ADDRESS);
-
+
for (int i = 0; i < numberOfMessages; i++)
{
message = session.createMessage(true);
@@ -676,9 +678,9 @@
session.commit();
}
}
-
+
session.commit();
-
+
server.stop();
server = createServer(true,
@@ -693,7 +695,7 @@
queue = server.locateQueue(ADDRESS);
- // assertEquals(numberOfMessages, queue.getMessageCount());
+ // assertEquals(numberOfMessages, queue.getMessageCount());
xids = new LinkedList<Xid>();
@@ -725,7 +727,6 @@
sessionConsumer.close();
-
}
finally
{
@@ -1038,50 +1039,56 @@
server.start();
- ServerLocator locator = createInVMNonHALocator();
+ try
+ {
- locator.setBlockOnNonDurableSend(true);
- locator.setBlockOnDurableSend(true);
- locator.setBlockOnAcknowledge(true);
+ ServerLocator locator = createInVMNonHALocator();
- ClientSessionFactory csf = locator.createSessionFactory();
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
- ClientSession session = csf.createSession();
+ ClientSessionFactory csf = locator.createSessionFactory();
- session.start();
+ ClientSession session = csf.createSession();
- for (int i = 1; i <= 2; i++)
- {
- ClientConsumer cons = session.createConsumer("q" + i);
+ session.start();
- for (int j = 3; j < 6; j++)
+ for (int i = 1; i <= 2; i++)
{
- ClientMessage msg = cons.receive(5000);
+ ClientConsumer cons = session.createConsumer("q" + i);
- assertNotNull(msg);
+ for (int j = 3; j < 6; j++)
+ {
+ ClientMessage msg = cons.receive(5000);
- assertEquals("str-" + j, msg.getStringProperty("id"));
+ assertNotNull(msg);
- msg.acknowledge();
+ assertEquals("str-" + j, msg.getStringProperty("id"));
+
+ msg.acknowledge();
+ }
+
+ session.commit();
+ assertNull(cons.receive(500));
+
}
- session.commit();
- assertNull(cons.receive(500));
+ session.close();
- }
+ long timeout = System.currentTimeMillis() + 5000;
- session.close();
+ while (System.currentTimeMillis() < timeout && server.getPagingManager().getPageStore(ADDRESS).isPaging())
+ {
+ Thread.sleep(100);
+ }
- long timeout = System.currentTimeMillis() + 5000;
-
- while (System.currentTimeMillis() < timeout && server.getPagingManager().getPageStore(ADDRESS).isPaging())
+ locator.close();
+ }
+ finally
{
- Thread.sleep(100);
+ server.stop();
}
-
- locator.close();
-
- server.stop();
}
public void testTwoQueuesOneNoRouting() throws Exception
@@ -1275,6 +1282,40 @@
bb.put(getSamplebyte(j));
}
+ final AtomicBoolean running = new AtomicBoolean(true);
+
+ class TCount extends Thread
+ {
+ Queue queue;
+
+ TCount(Queue queue)
+ {
+ this.queue = queue;
+ }
+
+ public void run()
+ {
+ try
+ {
+ while (running.get())
+ {
+ // log.info("Message count = " + queue.getMessageCount() + " on queue " + queue.getName());
+ queue.getMessagesAdded();
+ queue.getMessageCount();
+ // log.info("Message added = " + queue.getMessagesAdded() + " on queue " + queue.getName());
+ Thread.sleep(10);
+ }
+ }
+ catch (InterruptedException e)
+ {
+ log.info("Thread interrupted");
+ }
+ }
+ };
+
+ TCount tcount1 = null;
+ TCount tcount2 = null;
+
try
{
{
@@ -1309,6 +1350,7 @@
{
if (i % 500 == 0)
{
+ log.info("Sent " + i + " messages");
session.commit();
}
message = session.createMessage(true);
@@ -1339,6 +1381,23 @@
new HashMap<String, AddressSettings>());
server.start();
+ Queue queue1 = server.locateQueue(PagingTest.ADDRESS.concat("-1"));
+
+ Queue queue2 = server.locateQueue(PagingTest.ADDRESS.concat("-2"));
+
+ assertNotNull(queue1);
+
+ assertNotNull(queue2);
+
+ assertNotSame(queue1, queue2);
+
+ tcount1 = new TCount(queue1);
+
+ tcount2 = new TCount(queue2);
+
+ tcount1.start();
+ tcount2.start();
+
ServerLocator locator = createInVMNonHALocator();
final ClientSessionFactory sf2 = locator.createSessionFactory();
@@ -1375,8 +1434,14 @@
Assert.assertNotNull(message2);
- if (i % 1000 == 0)
+ if (i % 100 == 0)
+ {
+ if (i % 5000 == 0)
+ {
+ log.info(addressToSubscribe + " consumed " + i + " messages");
+ }
session.commit();
+ }
try
{
@@ -1437,6 +1502,20 @@
}
finally
{
+ running.set(false);
+
+ if (tcount1 != null)
+ {
+ tcount1.interrupt();
+ tcount1.join();
+ }
+
+ if (tcount2 != null)
+ {
+ tcount2.interrupt();
+ tcount2.join();
+ }
+
try
{
server.stop();
@@ -2434,7 +2513,7 @@
producerThread.start();
- assertTrue(ready.await(10, TimeUnit.SECONDS));
+ assertTrue(ready.await(100, TimeUnit.SECONDS));
ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
@@ -4100,6 +4179,146 @@
}
}
+ public void testTwoQueuesConsumeOneRestart() throws Exception
+ {
+ boolean persistentMessages = true;
+
+ clearData();
+
+ Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
+
+ HornetQServer server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int messageSize = 1024;
+
+ final int numberOfMessages = 1000;
+
+ try
+ {
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setClientFailureCheckPeriod(120000);
+ locator.setConnectionTTL(5000000);
+ locator.setCallTimeout(120000);
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=1"), null, true);
+ session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=2"), null, true);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ ClientMessage message = null;
+
+ byte[] body = new byte[messageSize];
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = session.createMessage(persistentMessages);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty("propTest", i % 2 == 0 ? 1 : 2);
+
+ producer.send(message);
+ if (i % 1000 == 0)
+ {
+ session.commit();
+ }
+ }
+
+ session.commit();
+
+ session.start();
+
+ // ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS.concat("=1"));
+ //
+ // for (int i = 0; i < numberOfMessages; i++)
+ // {
+ // message = consumer.receive(500000);
+ // assertNotNull(message);
+ // message.acknowledge();
+ //
+ // // assertEquals(msg, message.getIntProperty("propTest").intValue());
+ //
+ // System.out.println("i = " + i + " msg = " + message.getIntProperty("propTest"));
+ // }
+ //
+ // session.commit();
+
+ // consumer.close();
+
+ session.deleteQueue(PagingTest.ADDRESS.concat("=1"));
+ // server.stop();
+ // server.start();
+
+ sf = locator.createSessionFactory();
+
+ session = sf.createSession(false, false, false);
+
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS.concat("=2"));
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = consumer.receive(500000);
+ assertNotNull(message);
+ message.acknowledge();
+
+ // assertEquals(msg, message.getIntProperty("propTest").intValue());
+
+ System.out.println("i = " + i + " msg = " + message.getIntProperty("propTest"));
+ }
+
+ session.commit();
+
+ assertNull(consumer.receiveImmediate());
+
+ consumer.close();
+
+ // It's async, so need to wait a bit for it happening
+ assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
+
+ server.stop();
+
+ server.start();
+
+ server.stop();
+ server.start();
+
+ sf.close();
+
+ locator.close();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
public void testDLAOnLargeMessageAndPaging() throws Exception
{
clearData();
@@ -4316,7 +4535,7 @@
pgStoreAddress = server.getPagingManager().getPageStore(ADDRESS);
- pgStoreAddress.getCursorProvier().getSubscription(server.locateQueue(ADDRESS).getID()).cleanupEntries();
+ pgStoreAddress.getCursorProvier().getSubscription(server.locateQueue(ADDRESS).getID()).cleanupEntries(false);
pgStoreAddress.getCursorProvier().cleanup();
@@ -4455,7 +4674,7 @@
for (int i = 0; i < 500; i++)
{
log.info("Received message " + i);
- message = cons.receive(5000);
+ message = cons.receive(10000);
assertNotNull(message);
message.acknowledge();
13 years
JBoss hornetq SVN: r11938 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/paging/cursor/impl and 3 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-12-23 09:05:26 -0500 (Fri, 23 Dec 2011)
New Revision: 11938
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
JBPAPP-7817 - removing unreferenced records during deletion of a destination
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2011-12-22 14:40:03 UTC (rev 11937)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2011-12-23 14:05:26 UTC (rev 11938)
@@ -58,7 +58,7 @@
void scheduleCleanupCheck();
- void cleanupEntries() throws Exception;
+ void cleanupEntries(boolean completeDelete) throws Exception;
void disableAutoCleanup();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java 2011-12-22 14:40:03 UTC (rev 11937)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscriptionCounter.java 2011-12-23 14:05:26 UTC (rev 11938)
@@ -44,5 +44,8 @@
* @param variance
*/
void addInc(long id, int variance);
+
+ // used when deleting the counter
+ void delete() throws Exception;
}
\ No newline at end of file
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java 2011-12-22 14:40:03 UTC (rev 11937)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionCounterImpl.java 2011-12-23 14:05:26 UTC (rev 11938)
@@ -175,6 +175,25 @@
}
}
+
+ public void delete() throws Exception
+ {
+ synchronized (this)
+ {
+ long tx = storage.generateUniqueID();
+ for (Long record : incrementRecords)
+ {
+ storage.deleteIncrementRecord(tx, record.longValue());
+ }
+
+ if (recordID >= 0)
+ {
+ storage.deletePageCounter(tx, this.recordID);
+ }
+
+ storage.commit(tx);
+ }
+ }
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageSubscriptionCounter#loadInc(long, int)
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-12-22 14:40:03 UTC (rev 11937)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-12-23 14:05:26 UTC (rev 11938)
@@ -201,7 +201,7 @@
{
try
{
- cleanupEntries();
+ cleanupEntries(false);
}
catch (Exception e)
{
@@ -215,8 +215,12 @@
/**
* It will cleanup all the records for completed pages
* */
- public void cleanupEntries() throws Exception
+ public void cleanupEntries(final boolean completeDelete) throws Exception
{
+ if (completeDelete)
+ {
+ counter.delete();
+ }
Transaction tx = new TransactionImpl(store);
boolean persist = false;
@@ -292,7 +296,10 @@
}
}
- cursorProvider.scheduleCleanup();
+ if (!completeDelete)
+ {
+ cursorProvider.scheduleCleanup();
+ }
}
});
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-12-22 14:40:03 UTC (rev 11937)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-12-23 14:05:26 UTC (rev 11938)
@@ -1147,7 +1147,9 @@
}
else
{
- log.warn("Can't find queue " + encoding.queueID + " while reloading ACKNOWLEDGE_CURSOR");
+ log.info("Can't find queue " + encoding.queueID + " while reloading ACKNOWLEDGE_CURSOR, deleting record now");
+ messageJournal.appendDeleteRecord(record.id, false);
+
}
break;
@@ -1166,7 +1168,8 @@
}
else
{
- log.warn("Can't find queue " + encoding.queueID + " while reloading ACKNOWLEDGE_CURSOR");
+ log.info("Can't find queue " + encoding.queueID + " while reloading PAGE_CURSOR_COUNTER_VALUE, deleting record now");
+ messageJournal.appendDeleteRecord(record.id, false);
}
break;
@@ -1186,7 +1189,8 @@
}
else
{
- log.warn("Can't find queue " + encoding.queueID + " while reloading ACKNOWLEDGE_CURSOR");
+ log.info("Can't find queue " + encoding.queueID + " while reloading PAGE_CURSOR_COUNTER_INC, deleting record now");
+ messageJournal.appendDeleteRecord(record.id, false);
}
break;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-12-22 14:40:03 UTC (rev 11937)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-12-23 14:05:26 UTC (rev 11938)
@@ -977,11 +977,6 @@
Queue queue = (Queue)binding.getBindable();
- if (queue.getPageSubscription() != null)
- {
- queue.getPageSubscription().close();
- }
-
if (queue.getConsumerCount() != 0)
{
throw new HornetQException(HornetQException.ILLEGAL_STATE, "Cannot delete queue " + queue.getName() +
@@ -1004,14 +999,27 @@
}
}
+ postOffice.removeBinding(queueName);
+
queue.deleteAllReferences();
if (queue.isDurable())
{
storageManager.deleteQueueBinding(queue.getID());
}
+
- postOffice.removeBinding(queueName);
+ if (queue.getPageSubscription() != null)
+ {
+ queue.getPageSubscription().close();
+ }
+
+ PageSubscription subs = queue.getPageSubscription();
+
+ if (subs != null)
+ {
+ subs.cleanupEntries(true);
+ }
}
public synchronized void registerActivateCallback(final ActivateCallback callback)
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-12-22 14:40:03 UTC (rev 11937)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-12-23 14:05:26 UTC (rev 11938)
@@ -121,7 +121,7 @@
protected void tearDown() throws Exception
{
locator.close();
-
+
locator = null;
super.tearDown();
@@ -368,7 +368,6 @@
config.setJournalSyncNonTransactional(false);
-
HornetQServer server = createServer(true,
config,
PagingTest.PAGE_SIZE,
@@ -429,25 +428,25 @@
}
}
session.commit();
-
+
session.start();
-
+
ClientConsumer cons = session.createConsumer(ADDRESS);
-
- for (int i = 0 ; i < numberOfMessages; i++)
+
+ for (int i = 0; i < numberOfMessages; i++)
{
message = cons.receive(5000);
assertNotNull(message);
message.acknowledge();
-
+
if (i % 10 == 0)
{
session.commit();
}
}
-
+
session.commit();
-
+
}
finally
{
@@ -4283,6 +4282,146 @@
}
}
+ public void testTwoQueuesConsumeOneRestart() throws Exception
+ {
+ boolean persistentMessages = true;
+
+ clearData();
+
+ Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
+
+ HornetQServer server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int messageSize = 1024;
+
+ final int numberOfMessages = 1000;
+
+ try
+ {
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setClientFailureCheckPeriod(120000);
+ locator.setConnectionTTL(5000000);
+ locator.setCallTimeout(120000);
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=1"), null, true);
+ session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("=2"), null, true);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ ClientMessage message = null;
+
+ byte[] body = new byte[messageSize];
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = session.createMessage(persistentMessages);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty("propTest", i % 2 == 0 ? 1 : 2);
+
+ producer.send(message);
+ if (i % 1000 == 0)
+ {
+ session.commit();
+ }
+ }
+
+ session.commit();
+
+ session.start();
+
+ // ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS.concat("=1"));
+ //
+ // for (int i = 0; i < numberOfMessages; i++)
+ // {
+ // message = consumer.receive(500000);
+ // assertNotNull(message);
+ // message.acknowledge();
+ //
+ // // assertEquals(msg, message.getIntProperty("propTest").intValue());
+ //
+ // System.out.println("i = " + i + " msg = " + message.getIntProperty("propTest"));
+ // }
+ //
+ // session.commit();
+
+ // consumer.close();
+
+ session.deleteQueue(PagingTest.ADDRESS.concat("=1"));
+ // server.stop();
+ // server.start();
+
+ sf = locator.createSessionFactory();
+
+ session = sf.createSession(false, false, false);
+
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS.concat("=2"));
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = consumer.receive(500000);
+ assertNotNull(message);
+ message.acknowledge();
+
+ // assertEquals(msg, message.getIntProperty("propTest").intValue());
+
+ System.out.println("i = " + i + " msg = " + message.getIntProperty("propTest"));
+ }
+
+ session.commit();
+
+ assertNull(consumer.receiveImmediate());
+
+ consumer.close();
+
+ // It's async, so need to wait a bit for it happening
+ assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
+
+ server.stop();
+
+ server.start();
+
+ server.stop();
+ server.start();
+
+ sf.close();
+
+ locator.close();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
public void testDLAOnLargeMessageAndPaging() throws Exception
{
clearData();
@@ -4499,7 +4638,7 @@
pgStoreAddress = server.getPagingManager().getPageStore(ADDRESS);
- pgStoreAddress.getCursorProvier().getSubscription(server.locateQueue(ADDRESS).getID()).cleanupEntries();
+ pgStoreAddress.getCursorProvier().getSubscription(server.locateQueue(ADDRESS).getID()).cleanupEntries(false);
pgStoreAddress.getCursorProvier().cleanup();
13 years
JBoss hornetq SVN: r11937 - tags.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-12-22 09:40:03 -0500 (Thu, 22 Dec 2011)
New Revision: 11937
Added:
tags/HornetQ_2_2_10_EAP_BUILD1/
Log:
2.2.10.EAP build
13 years
JBoss hornetq SVN: r11936 - in branches/Branch_2_2_EAP: hornetq-rest and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-12-22 09:37:50 -0500 (Thu, 22 Dec 2011)
New Revision: 11936
Modified:
branches/Branch_2_2_EAP/hornetq-rest/pom.xml
branches/Branch_2_2_EAP/pom.xml
Log:
version changes
Modified: branches/Branch_2_2_EAP/hornetq-rest/pom.xml
===================================================================
--- branches/Branch_2_2_EAP/hornetq-rest/pom.xml 2011-12-22 14:31:25 UTC (rev 11935)
+++ branches/Branch_2_2_EAP/hornetq-rest/pom.xml 2011-12-22 14:37:50 UTC (rev 11936)
@@ -10,7 +10,7 @@
<properties>
<resteasy.version>2.0.1.GA</resteasy.version>
- <hornetq.version>2.2.10.GA</hornetq.version>
+ <hornetq.version>2.2.10.EAP.GA</hornetq.version>
</properties>
<licenses>
Modified: branches/Branch_2_2_EAP/pom.xml
===================================================================
--- branches/Branch_2_2_EAP/pom.xml 2011-12-22 14:31:25 UTC (rev 11935)
+++ branches/Branch_2_2_EAP/pom.xml 2011-12-22 14:37:50 UTC (rev 11936)
@@ -19,7 +19,7 @@
<groupId>org.hornetq</groupId>
<artifactId>messaging</artifactId>
<packaging>pom</packaging>
- <version>2.2.10.GA</version>
+ <version>2.2.10.EAP.GA</version>
<properties>
<resteasy.version>2.1.0.GA</resteasy.version>
13 years
JBoss hornetq SVN: r11935 - tags.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-12-22 09:31:25 -0500 (Thu, 22 Dec 2011)
New Revision: 11935
Removed:
tags/HornetQ_2_2_10_EAP_BUILD1/
Log:
deleting tag - mistake on pom
13 years
JBoss hornetq SVN: r11934 - in branches/Branch_2_2_EAP: hornetq-rest and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-12-22 09:30:35 -0500 (Thu, 22 Dec 2011)
New Revision: 11934
Modified:
branches/Branch_2_2_EAP/hornetq-rest/pom.xml
branches/Branch_2_2_EAP/pom.xml
Log:
fixing version
Modified: branches/Branch_2_2_EAP/hornetq-rest/pom.xml
===================================================================
--- branches/Branch_2_2_EAP/hornetq-rest/pom.xml 2011-12-22 14:20:25 UTC (rev 11933)
+++ branches/Branch_2_2_EAP/hornetq-rest/pom.xml 2011-12-22 14:30:35 UTC (rev 11934)
@@ -10,7 +10,7 @@
<properties>
<resteasy.version>2.0.1.GA</resteasy.version>
- <hornetq.version>2.2.10.EAP.GA</hornetq.version>
+ <hornetq.version>2.2.10.GA</hornetq.version>
</properties>
<licenses>
Modified: branches/Branch_2_2_EAP/pom.xml
===================================================================
--- branches/Branch_2_2_EAP/pom.xml 2011-12-22 14:20:25 UTC (rev 11933)
+++ branches/Branch_2_2_EAP/pom.xml 2011-12-22 14:30:35 UTC (rev 11934)
@@ -19,7 +19,7 @@
<groupId>org.hornetq</groupId>
<artifactId>messaging</artifactId>
<packaging>pom</packaging>
- <version>2.2.1.GA</version>
+ <version>2.2.10.GA</version>
<properties>
<resteasy.version>2.1.0.GA</resteasy.version>
13 years
JBoss hornetq SVN: r11933 - tags.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-12-22 09:20:25 -0500 (Thu, 22 Dec 2011)
New Revision: 11933
Added:
tags/HornetQ_2_2_10_EAP_BUILD1/
Log:
Tagging 2.2.10 for QA
13 years