JBoss hornetq SVN: r10169 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/server/impl and 2 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-02-01 22:36:20 -0500 (Tue, 01 Feb 2011)
New Revision: 10169
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
Adding ignore on topic subscription invalid selector on paging
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-02-02 02:21:13 UTC (rev 10168)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-02-02 03:36:20 UTC (rev 10169)
@@ -272,7 +272,7 @@
if (complete)
{
- log.debug("Address " + pagingStore.getAddress() + " is leaving page mode as all messages are consumed and acknowledged from the page store");
+ log.info("Address " + pagingStore.getAddress() + " is leaving page mode as all messages are consumed and acknowledged from the page store");
pagingStore.forceAnotherPage();
Page currentPage = pagingStore.getCurrentPage();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-02-02 02:21:13 UTC (rev 10168)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-02-02 03:36:20 UTC (rev 10169)
@@ -131,6 +131,13 @@
// ------------------------------------------------------------------------------------
private static final Logger log = Logger.getLogger(HornetQServerImpl.class);
+
+ // JMS Topics (which are outside of the scope of the core API) will require a dumb subscription with a dummy-filter at this current version
+ // as a way to keep its existence valid and TCK tests
+ // That subscription needs an invalid filter, however paging needs to ignore any subscription with this filter.
+ // For that reason, this filter needs to be rejected on paging or any other component on the system, and just be ignored for any purpose
+ // It's declared here as this filter is considered a global ignore
+ public static final String GENERIC_IGNORED_FILTER = "__HQX=-1";
// Static
// ---------------------------------------------------------------------------------------
@@ -1629,7 +1636,17 @@
long queueID = storageManager.generateUniqueID();
- PageSubscription pageSubscription = pagingManager.getPageStore(address).getCursorProvier().createSubscription(queueID, filter, durable);
+ PageSubscription pageSubscription;
+
+
+ if (filterString != null && filterString.toString().equals(GENERIC_IGNORED_FILTER))
+ {
+ pageSubscription = null;
+ }
+ else
+ {
+ pageSubscription = pagingManager.getPageStore(address).getCursorProvier().createSubscription(queueID, filter, durable);
+ }
final Queue queue = queueFactory.createQueue(queueID,
address,
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2011-02-02 02:21:13 UTC (rev 10168)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2011-02-02 03:36:20 UTC (rev 10169)
@@ -40,6 +40,7 @@
import org.hornetq.core.security.Role;
import org.hornetq.core.server.ActivateCallback;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.impl.HornetQServerImpl;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.transaction.ResourceManager;
import org.hornetq.core.transaction.TransactionDetail;
@@ -89,7 +90,7 @@
{
private static final Logger log = Logger.getLogger(JMSServerManagerImpl.class);
- private static final String REJECT_FILTER = "__HQX=-1";
+ private static final String REJECT_FILTER = HornetQServerImpl.GENERIC_IGNORED_FILTER;
private BindingRegistry registry;
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-02-02 02:21:13 UTC (rev 10168)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-02-02 03:36:20 UTC (rev 10169)
@@ -49,6 +49,7 @@
import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.impl.HornetQServerImpl;
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.tests.util.ServiceTestBase;
@@ -110,16 +111,16 @@
super.tearDown();
}
-
+
public void testPreparePersistent() throws Exception
{
boolean persistentMessages = true;
-
+
System.out.println("PageDir:" + getPageDir());
clearData();
Configuration config = createDefaultConfig();
-
+
config.setJournalSyncNonTransactional(false);
HornetQServer server = createServer(true,
@@ -129,7 +130,7 @@
new HashMap<String, AddressSettings>());
server.start();
-
+
final int messageSize = 1024;
final int numberOfMessages = 10000;
@@ -180,7 +181,7 @@
session.commit();
session.close();
session = null;
-
+
sf.close();
locator.close();
@@ -195,16 +196,15 @@
locator = createInVMNonHALocator();
sf = locator.createSessionFactory();
-
+
Queue queue = server.locateQueue(ADDRESS);
-
+
assertEquals(numberOfMessages, queue.getMessageCount());
-
LinkedList<Xid> xids = new LinkedList<Xid>();
-
+
int msgReceived = 0;
- for (int i = 0 ; i < numberOfMessages / 999; i++)
+ for (int i = 0; i < numberOfMessages / 999; i++)
{
ClientSession sessionConsumer = sf.createSession(true, false, false);
Xid xid = newXID();
@@ -212,7 +212,7 @@
sessionConsumer.start(xid, XAResource.TMNOFLAGS);
sessionConsumer.start();
ClientConsumer consumer = sessionConsumer.createConsumer(PagingTest.ADDRESS);
- for (int msgCount = 0 ; msgCount < 1000; i++)
+ for (int msgCount = 0; msgCount < 1000; i++)
{
if (msgReceived == numberOfMessages)
{
@@ -227,18 +227,17 @@
sessionConsumer.prepare(xid);
sessionConsumer.close();
}
-
-
+
ClientSession sessionCheck = sf.createSession(true, true);
-
+
ClientConsumer consumer = sessionCheck.createConsumer(PagingTest.ADDRESS);
-
+
assertNull(consumer.receiveImmediate());
sessionCheck.close();
-
+
System.out.println(queue.getMessagesAdded());
-
+
assertEquals(numberOfMessages, queue.getMessageCount());
sf.close();
@@ -263,26 +262,26 @@
consumer = session.createConsumer(PagingTest.ADDRESS);
session.start();
-
+
assertNull(consumer.receiveImmediate());
-
+
for (Xid xid : xids)
{
session.rollback(xid);
}
-
+
xids.clear();
-
+
assertNotNull(consumer.receiveImmediate());
session.close();
-
+
sf.close();
-
+
locator.close();
-
+
queue.getMessageCount();
- //assertEquals(numberOfMessages, queue.getMessageCount());
+ // assertEquals(numberOfMessages, queue.getMessageCount());
}
finally
{
@@ -297,7 +296,121 @@
}
+ public void testTwoQueuesOneNoRouting() throws Exception
+ {
+ boolean persistentMessages = true;
+ clearData();
+
+ Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
+
+ HornetQServer server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int messageSize = 1024;
+
+ final int numberOfMessages = 1000;
+
+ try
+ {
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+ session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS.concat("-invalid"), new SimpleString(HornetQServerImpl.GENERIC_IGNORED_FILTER), true);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ ClientMessage message = null;
+
+ byte[] body = new byte[messageSize];
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = session.createMessage(persistentMessages);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty(new SimpleString("id"), i);
+
+ producer.send(message);
+ if (i % 1000 == 0)
+ {
+ session.commit();
+ }
+ }
+
+ session.commit();
+
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = consumer.receive(5000);
+ assertNotNull(message);
+ message.acknowledge();
+
+ assertEquals(i, message.getIntProperty("id").intValue());
+ if (i % 1000 == 0)
+ {
+ session.commit();
+ }
+ }
+
+ session.commit();
+
+ session.commit();
+
+ session.commit();
+
+ PagingStore store = server.getPagingManager().getPageStore(ADDRESS);
+ store.getCursorProvier().cleanup();
+
+ long timeout = System.currentTimeMillis() + 5000;
+ while (store.isPaging() && timeout > System.currentTimeMillis())
+ {
+ Thread.sleep(100);
+ }
+
+ // It's async, so need to wait a bit for it happening
+ assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
+
+ sf.close();
+
+ locator.close();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ // System.exit(-1);
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
public void testSendReceivePagingPersistent() throws Exception
{
internaltestSendReceivePaging(true);
@@ -312,18 +425,18 @@
{
internalMultiQueuesTest(true);
}
-
+
public void testWithMultiQueues() throws Exception
{
internalMultiQueuesTest(false);
}
-
+
public void internalMultiQueuesTest(final boolean divert) throws Exception
{
clearData();
Configuration config = createDefaultConfig();
-
+
config.setJournalSyncNonTransactional(false);
HornetQServer server = createServer(true,
@@ -333,7 +446,7 @@
new HashMap<String, AddressSettings>());
if (divert)
- {
+ {
DivertConfiguration divert1 = new DivertConfiguration("dv1",
"nm1",
PagingTest.ADDRESS.toString(),
@@ -341,7 +454,7 @@
true,
null,
null);
-
+
DivertConfiguration divert2 = new DivertConfiguration("dv2",
"nm2",
PagingTest.ADDRESS.toString(),
@@ -349,11 +462,11 @@
true,
null,
null);
-
+
ArrayList<DivertConfiguration> divertList = new ArrayList<DivertConfiguration>();
divertList.add(divert1);
divertList.add(divert2);
-
+
config.setDivertConfigurations(divertList);
}
@@ -424,7 +537,7 @@
session.close();
server.stop();
-
+
sf.close();
locator.close();
}
@@ -472,7 +585,8 @@
Assert.assertNotNull(message2);
- if (i % 1000 == 0) session.commit();
+ if (i % 1000 == 0)
+ session.commit();
try
{
@@ -487,7 +601,7 @@
throw e;
}
}
-
+
session.commit();
consumer.close();
@@ -514,13 +628,12 @@
threads[i].join();
}
-
sf2.close();
locator.close();
assertEquals(0, errors.get());
-
- for (int i = 0 ; i < 20 && server.getPostOffice().getPagingManager().getTransactions().size() != 0; i++)
+
+ for (int i = 0; i < 20 && server.getPostOffice().getPagingManager().getTransactions().size() != 0; i++)
{
if (server.getPostOffice().getPagingManager().getTransactions().size() != 0)
{
@@ -547,12 +660,12 @@
private void internaltestSendReceivePaging(final boolean persistentMessages) throws Exception
{
-
+
System.out.println("PageDir:" + getPageDir());
clearData();
Configuration config = createDefaultConfig();
-
+
config.setJournalSyncNonTransactional(false);
HornetQServer server = createServer(true,
@@ -644,7 +757,8 @@
Assert.assertNotNull(message2);
- if (i % 1000 == 0) session.commit();
+ if (i % 1000 == 0)
+ session.commit();
try
{
@@ -663,9 +777,9 @@
consumer.close();
session.close();
-
+
sf.close();
-
+
locator.close();
}
finally
@@ -689,7 +803,7 @@
UnitTestCase.assertEqualsByteArrays(body, other);
}
-
+
/**
* - Make a destination in page mode
* - Add stuff to a transaction
@@ -813,9 +927,9 @@
consumer.close();
session.close();
-
+
sf.close();
-
+
locator.close();
}
finally
@@ -925,12 +1039,14 @@
assertFalse(msg.getBooleanProperty("new"));
Assert.assertNotNull(msg);
}
-
+
ClientMessage msgReceived = consumer.receiveImmediate();
-
+
if (msgReceived != null)
{
- System.out.println("new = " + msgReceived.getBooleanProperty("new") + " id = " + msgReceived.getIntProperty("id"));
+ System.out.println("new = " + msgReceived.getBooleanProperty("new") +
+ " id = " +
+ msgReceived.getIntProperty("id"));
}
Assert.assertNull(msgReceived);
@@ -972,9 +1088,9 @@
consumer.close();
session.close();
-
+
sf.close();
-
+
locator.close();
}
finally
@@ -1013,7 +1129,6 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
-
ClientSessionFactory sf = locator.createSessionFactory();
byte[] body = new byte[messageSize];
@@ -1034,7 +1149,7 @@
ClientMessage message = sessionNonTX.createMessage(true);
message.getBodyBuffer().writeBytes(body);
message.putIntProperty(new SimpleString("id"), i);
- message.putStringProperty(new SimpleString("tst"), new SimpleString("i=" + i));
+ message.putStringProperty(new SimpleString("tst"), new SimpleString("i=" + i));
producerTransacted.send(message);
@@ -1044,7 +1159,7 @@
for (int j = 0; j < 20; j++)
{
ClientMessage msgSend = sessionNonTX.createMessage(true);
- msgSend.putStringProperty(new SimpleString("tst"), new SimpleString("i=" + i + ", j=" + j));
+ msgSend.putStringProperty(new SimpleString("tst"), new SimpleString("i=" + i + ", j=" + j));
msgSend.getBodyBuffer().writeBytes(new byte[10 * 1024]);
producerNonTransacted.send(msgSend);
}
@@ -1106,9 +1221,9 @@
consumer.close();
sessionNonTX.close();
-
+
sf.close();
-
+
locator.close();
}
finally
@@ -1154,7 +1269,7 @@
try
{
-
+
final ClientSessionFactory sf = locator.createSessionFactory();
final byte[] body = new byte[messageSize];
@@ -1220,14 +1335,14 @@
ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
for (int i = 0; i < numberOfMessages; i++)
- {
+ {
ClientMessage msg = consumer.receive(5000);
assertNotNull(msg);
assertEquals(i, msg.getIntProperty("count").intValue());
msg.acknowledge();
if (i > 0 && i % 10 == 0)
{
- session.commit();
+ session.commit();
}
}
session.commit();
@@ -1235,9 +1350,9 @@
session.close();
producerThread.join();
-
+
locator.close();
-
+
sf.close();
assertEquals(0, errors.get());
@@ -1396,7 +1511,7 @@
clearData();
Configuration config = createDefaultConfig();
-
+
config.setJournalSyncNonTransactional(false);
HornetQServer server = createServer(true,
@@ -1650,28 +1765,27 @@
}
session.commit();
-
+
session.close();
-
+
locator.close();
-
+
locator = createInVMNonHALocator();
-
+
server.stop();
-
-
+
server = createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
new HashMap<String, AddressSettings>());
-
+
server.start();
sf = locator.createSessionFactory();
-
- session = sf.createSession(null, null, false, false, false, false, 0);
+ session = sf.createSession(null, null, false, false, false, false, 0);
+
ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
session.start();
@@ -1745,27 +1859,27 @@
}
session.commit();
-
+
session.close();
-
+
locator.close();
-
+
server.stop();
-
+
server = createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
new HashMap<String, AddressSettings>());
-
+
server.start();
-
+
locator = createInVMNonHALocator();
sf = locator.createSessionFactory();
-
- session = sf.createSession(null, null, false, false, false, false, 0);
+ session = sf.createSession(null, null, false, false, false, false, 0);
+
ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
session.start();
@@ -1781,25 +1895,25 @@
}
session.close();
-
+
locator.close();
-
+
server.stop();
-
+
server = createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
new HashMap<String, AddressSettings>());
-
+
server.start();
-
+
locator = createInVMNonHALocator();
sf = locator.createSessionFactory();
-
- session = sf.createSession(null, null, false, false, false, false, 0);
+ session = sf.createSession(null, null, false, false, false, false, 0);
+
consumer = session.createConsumer(PagingTest.ADDRESS);
session.start();
@@ -1979,7 +2093,7 @@
}
}
-
+
public void testDropMessagesExpiring() throws Exception
{
clearData();
@@ -2189,7 +2303,7 @@
}
}
-
+
public void testSyncPage() throws Exception
{
Configuration config = createDefaultConfig();
@@ -2205,74 +2319,73 @@
try
{
server.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true, false);
-
+
final CountDownLatch pageUp = new CountDownLatch(0);
final CountDownLatch pageDone = new CountDownLatch(1);
-
+
OperationContext ctx = new OperationContext()
{
-
+
public void onError(int errorCode, String errorMessage)
{
}
-
+
public void done()
{
}
-
+
public void storeLineUp()
{
}
-
+
public boolean waitCompletion(long timeout) throws Exception
{
return false;
}
-
+
public void waitCompletion() throws Exception
{
-
+
}
-
+
public void replicationLineUp()
{
-
+
}
-
+
public void replicationDone()
{
-
+
}
-
+
public void pageSyncLineUp()
{
pageUp.countDown();
}
-
+
public void pageSyncDone()
{
pageDone.countDown();
}
-
+
public void executeOnCompletion(IOAsyncTask runnable)
{
-
+
}
};
-
OperationContextImpl.setContext(ctx);
-
+
PagingManager paging = server.getPagingManager();
-
+
PagingStore store = paging.getPageStore(ADDRESS);
-
+
store.sync();
-
+
assertTrue(pageUp.await(10, TimeUnit.SECONDS));
-
+
assertTrue(pageDone.await(10, TimeUnit.SECONDS));
-
+
server.stop();
}
@@ -2289,7 +2402,6 @@
}
-
public void testSyncPageTX() throws Exception
{
Configuration config = createDefaultConfig();
@@ -2305,74 +2417,73 @@
try
{
server.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true, false);
-
+
final CountDownLatch pageUp = new CountDownLatch(0);
final CountDownLatch pageDone = new CountDownLatch(1);
-
+
OperationContext ctx = new OperationContext()
{
-
+
public void onError(int errorCode, String errorMessage)
{
}
-
+
public void done()
{
}
-
+
public void storeLineUp()
{
}
-
+
public boolean waitCompletion(long timeout) throws Exception
{
return false;
}
-
+
public void waitCompletion() throws Exception
{
-
+
}
-
+
public void replicationLineUp()
{
-
+
}
-
+
public void replicationDone()
{
-
+
}
-
+
public void pageSyncLineUp()
{
pageUp.countDown();
}
-
+
public void pageSyncDone()
{
pageDone.countDown();
}
-
+
public void executeOnCompletion(IOAsyncTask runnable)
{
-
+
}
};
-
OperationContextImpl.setContext(ctx);
-
+
PagingManager paging = server.getPagingManager();
-
+
PagingStore store = paging.getPageStore(ADDRESS);
-
+
store.sync();
-
+
assertTrue(pageUp.await(10, TimeUnit.SECONDS));
-
+
assertTrue(pageDone.await(10, TimeUnit.SECONDS));
-
+
server.stop();
}
@@ -2389,7 +2500,6 @@
}
-
public void testPagingOneDestinationOnly() throws Exception
{
SimpleString PAGED_ADDRESS = new SimpleString("paged");
@@ -2464,7 +2574,7 @@
}
consumerNonPaged.close();
-
+
session.commit();
ackList = null;
14 years, 10 months
JBoss hornetq SVN: r10168 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/paging/cursor/impl and 3 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-02-01 21:21:13 -0500 (Tue, 01 Feb 2011)
New Revision: 10168
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
Fixing Tests and Page Counters
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2011-02-01 17:02:29 UTC (rev 10167)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PageSubscription.java 2011-02-02 02:21:13 UTC (rev 10168)
@@ -96,6 +96,8 @@
void processReload() throws Exception;
+ void addPendingDelivery(final PagePosition position);
+
/**
* To be used on redeliveries
* @param position
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-02-01 17:02:29 UTC (rev 10167)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-02-02 02:21:13 UTC (rev 10168)
@@ -121,7 +121,7 @@
if (pos.getMessageNr() >= cache.getNumberOfMessages())
{
// sanity check, this should never happen unless there's a bug
- throw new IllegalStateException("Invalid messageNumber passed = " + pos);
+ throw new IllegalStateException("Invalid messageNumber passed = " + pos + " on " + cache);
}
return cache.getMessage(pos.getMessageNr());
@@ -255,7 +255,7 @@
cursorList.addAll(activeCursors.values());
long minPage = checkMinPage(cursorList);
-
+
if (minPage == pagingStore.getCurrentWritingPage() && pagingStore.getCurrentPage().getNumberOfMessages() > 0)
{
boolean complete = true;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-02-01 17:02:29 UTC (rev 10167)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageSubscriptionImpl.java 2011-02-02 02:21:13 UTC (rev 10168)
@@ -73,8 +73,7 @@
private static void trace(final String message)
{
- // PageCursorImpl.log.info(message);
- System.out.println(message);
+ PageSubscriptionImpl.log.info(message);
}
private volatile boolean autoCleanup = true;
@@ -102,6 +101,8 @@
private final SortedMap<Long, PageCursorInfo> consumedPages = Collections.synchronizedSortedMap(new TreeMap<Long, PageCursorInfo>());
private final PageSubscriptionCounter counter;
+
+ private final AtomicLong deliveredCount = new AtomicLong(0);
// We only store the position for redeliveries. They will be read from the SoftCache again during delivery.
private final ConcurrentLinkedQueue<PagePosition> redeliveries = new ConcurrentLinkedQueue<PagePosition>();
@@ -176,7 +177,7 @@
public long getMessageCount()
{
- return counter.getValue();
+ return counter.getValue() - deliveredCount.get();
}
public PageSubscriptionCounter getCounter()
@@ -272,7 +273,7 @@
synchronized (PageSubscriptionImpl.this)
{
for (PageCursorInfo completePage : completedPages)
- {
+ {
if (isTrace)
{
PageSubscriptionImpl.trace("Removing page " + completePage.getPageId());
@@ -474,6 +475,11 @@
return consumedPages.firstKey();
}
}
+
+ public void addPendingDelivery(final PagePosition position)
+ {
+ getPageInfo(position).incrementPendingTX();
+ }
/* (non-Javadoc)
* @see org.hornetq.core.paging.cursor.PageCursor#returnElement(org.hornetq.core.paging.cursor.PagePosition)
@@ -483,6 +489,15 @@
synchronized (redeliveries)
{
redeliveries.add(position);
+ PageCursorInfo pageInfo = consumedPages.get(position.getPageNr());
+ if (pageInfo != null)
+ {
+ pageInfo.decrementPendingTX();
+ }
+ else
+ {
+ // this shouldn't really happen.
+ }
}
}
@@ -823,6 +838,9 @@
// The page was live at the time of the creation
private final boolean wasLive;
+
+ // There's a pending TX to add elements on this page
+ private AtomicInteger pendingTX = new AtomicInteger(0);
// There's a pending delete on the async IO pipe
// We're holding this object to avoid delete the pages before the IO is complete,
@@ -856,7 +874,7 @@
public boolean isDone()
{
- return getNumberOfMessages() == confirmed.get();
+ return getNumberOfMessages() == confirmed.get() && pendingTX.get() == 0;
}
public boolean isPendingDelete()
@@ -876,6 +894,16 @@
{
return pageId;
}
+
+ public void incrementPendingTX()
+ {
+ pendingTX.incrementAndGet();
+ }
+
+ public void decrementPendingTX()
+ {
+ pendingTX.decrementAndGet();
+ }
public boolean isRemoved(final PagePosition pos)
{
@@ -967,6 +995,7 @@
for (PagePosition confirmed : positions)
{
cursor.processACK(confirmed);
+ cursor.deliveredCount.decrementAndGet();
}
}
@@ -1201,10 +1230,8 @@
*/
public void remove()
{
- if (!isredelivery)
- {
- PageSubscriptionImpl.this.getPageInfo(position).remove(position);
- }
+ deliveredCount.incrementAndGet();
+ PageSubscriptionImpl.this.getPageInfo(position).remove(position);
}
/* (non-Javadoc)
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2011-02-01 17:02:29 UTC (rev 10167)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2011-02-02 02:21:13 UTC (rev 10168)
@@ -250,6 +250,7 @@
{
if (committed && useRedelivery)
{
+ cursor.addPendingDelivery(cursorPos);
cursor.redeliver(cursorPos);
return true;
}
@@ -271,6 +272,7 @@
{
lateDeliveries = new LinkedList<Pair<PageSubscription, PagePosition>>();
}
+ cursor.addPendingDelivery(cursorPos);
lateDeliveries.add(new Pair<PageSubscription, PagePosition>(cursor, cursorPos));
return true;
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-02-01 17:02:29 UTC (rev 10167)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-02-02 02:21:13 UTC (rev 10168)
@@ -849,8 +849,6 @@
return false;
}
- PagedMessage pagedMessage;
-
if (!message.isDurable())
{
// The address should never be transient when paging (even for non-persistent messages when paging)
@@ -858,8 +856,9 @@
message.bodyChanged();
}
- pagedMessage = new PagedMessageImpl(message, routeQueues(tx, listCtx), installPageTransaction(tx, listCtx));
+ PagedMessage pagedMessage = new PagedMessageImpl(message, routeQueues(tx, listCtx), tx == null ? -1 : tx.getID());
+
int bytesToWrite = pagedMessage.getEncodeSize() + PageImpl.SIZE_RECORD;
if (currentPageSize.addAndGet(bytesToWrite) > pageSize && currentPage.getNumberOfMessages() > 0)
@@ -868,7 +867,9 @@
openNewPage();
currentPageSize.addAndGet(bytesToWrite);
}
-
+
+ installPageTransaction(tx, listCtx, currentPage.getPageId());
+
currentPage.write(pagedMessage);
if (tx != null)
@@ -920,11 +921,11 @@
return ids;
}
- private long installPageTransaction(final Transaction tx, final RouteContextList listCtx) throws Exception
+ private PageTransactionInfo installPageTransaction(final Transaction tx, final RouteContextList listCtx, int pageID) throws Exception
{
if (tx == null)
{
- return -1;
+ return null;
}
else
{
@@ -939,7 +940,7 @@
pgTX.increment(listCtx.getNumberOfQueues());
- return tx.getID();
+ return pgTX;
}
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-02-01 17:02:29 UTC (rev 10167)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-02-02 02:21:13 UTC (rev 10168)
@@ -668,7 +668,7 @@
if (pageSubscription != null)
{
// messageReferences will have depaged messages which we need to discount from the counter as they are counted on the pageSubscription as well
- return messageReferences.size() - pagedReferences.get() + getScheduledCount() + deliveringCount.get() + pageSubscription.getMessageCount();
+ return messageReferences.size() + getScheduledCount() + deliveringCount.get() + pageSubscription.getMessageCount();
}
else
{
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-02-01 17:02:29 UTC (rev 10167)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-02-02 02:21:13 UTC (rev 10168)
@@ -80,7 +80,7 @@
// Constants -----------------------------------------------------
private static final Logger log = Logger.getLogger(PagingTest.class);
- private static final int RECEIVE_TIMEOUT = 30000;
+ private static final int RECEIVE_TIMEOUT = 5000;
private static final int PAGE_MAX = 100 * 1024;
@@ -237,6 +237,8 @@
sessionCheck.close();
+ System.out.println(queue.getMessagesAdded());
+
assertEquals(numberOfMessages, queue.getMessageCount());
sf.close();
@@ -359,7 +361,7 @@
final int messageSize = 1024;
- final int numberOfMessages = 30000;
+ final int numberOfMessages = 3000;
final byte[] body = new byte[messageSize];
@@ -563,7 +565,7 @@
final int numberOfIntegers = 256;
- final int numberOfMessages = 10000;
+ final int numberOfMessages = 1000;
try
{
@@ -1032,6 +1034,7 @@
ClientMessage message = sessionNonTX.createMessage(true);
message.getBodyBuffer().writeBytes(body);
message.putIntProperty(new SimpleString("id"), i);
+ message.putStringProperty(new SimpleString("tst"), new SimpleString("i=" + i));
producerTransacted.send(message);
@@ -1041,6 +1044,7 @@
for (int j = 0; j < 20; j++)
{
ClientMessage msgSend = sessionNonTX.createMessage(true);
+ msgSend.putStringProperty(new SimpleString("tst"), new SimpleString("i=" + i + ", j=" + j));
msgSend.getBodyBuffer().writeBytes(new byte[10 * 1024]);
producerNonTransacted.send(msgSend);
}
@@ -1403,7 +1407,7 @@
server.start();
- final int numberOfMessages = 10000;
+ final int numberOfMessages = 1000;
final int numberOfBytes = 1024;
14 years, 10 months
JBoss hornetq SVN: r10167 - tags/HornetQ_2_2_EAP_CR1/src/config/common.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-02-01 12:02:29 -0500 (Tue, 01 Feb 2011)
New Revision: 10167
Modified:
tags/HornetQ_2_2_EAP_CR1/src/config/common/hornetq-version.properties
Log:
changing release name
Modified: tags/HornetQ_2_2_EAP_CR1/src/config/common/hornetq-version.properties
===================================================================
--- tags/HornetQ_2_2_EAP_CR1/src/config/common/hornetq-version.properties 2011-02-01 16:37:18 UTC (rev 10166)
+++ tags/HornetQ_2_2_EAP_CR1/src/config/common/hornetq-version.properties 2011-02-01 17:02:29 UTC (rev 10167)
@@ -1,4 +1,4 @@
-hornetq.version.versionName=QA_10162
+hornetq.version.versionName=EAP_CR1
hornetq.version.majorVersion=2
hornetq.version.minorVersion=2
hornetq.version.microVersion=0
14 years, 10 months
JBoss hornetq SVN: r10166 - tags.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-02-01 11:37:18 -0500 (Tue, 01 Feb 2011)
New Revision: 10166
Added:
tags/HornetQ_2_2_EAP_CR1/
Log:
CR1 tag for the EAP Release
Copied: tags/HornetQ_2_2_EAP_CR1 (from rev 10165, branches/Branch_2_2_EAP)
14 years, 10 months