Author: clebert.suconic(a)jboss.com
Date: 2011-02-01 22:36:20 -0500 (Tue, 01 Feb 2011)
New Revision: 10169
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
Adding ignore on topic subscription invalid selector on paging
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-02-02
02:21:13 UTC (rev 10168)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/impl/PageCursorProviderImpl.java 2011-02-02
03:36:20 UTC (rev 10169)
@@ -272,7 +272,7 @@
if (complete)
{
- log.debug("Address " + pagingStore.getAddress() + " is
leaving page mode as all messages are consumed and acknowledged from the page
store");
+ log.info("Address " + pagingStore.getAddress() + " is
leaving page mode as all messages are consumed and acknowledged from the page
store");
pagingStore.forceAnotherPage();
Page currentPage = pagingStore.getCurrentPage();
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-02-02
02:21:13 UTC (rev 10168)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-02-02
03:36:20 UTC (rev 10169)
@@ -131,6 +131,13 @@
//
------------------------------------------------------------------------------------
private static final Logger log = Logger.getLogger(HornetQServerImpl.class);
+
+ // JMS Topics (which are outside of the scope of the core API) will require a dumb
subscription with a dummy-filter at this current version
+ // as a way to keep its existence valid and TCK tests
+ // That subscription needs an invalid filter, however paging needs to ignore any
subscription with this filter.
+ // For that reason, this filter needs to be rejected on paging or any other component
on the system, and just be ignored for any purpose
+ // It's declared here as this filter is considered a global ignore
+ public static final String GENERIC_IGNORED_FILTER = "__HQX=-1";
// Static
//
---------------------------------------------------------------------------------------
@@ -1629,7 +1636,17 @@
long queueID = storageManager.generateUniqueID();
- PageSubscription pageSubscription =
pagingManager.getPageStore(address).getCursorProvier().createSubscription(queueID, filter,
durable);
+ PageSubscription pageSubscription;
+
+
+ if (filterString != null &&
filterString.toString().equals(GENERIC_IGNORED_FILTER))
+ {
+ pageSubscription = null;
+ }
+ else
+ {
+ pageSubscription =
pagingManager.getPageStore(address).getCursorProvier().createSubscription(queueID, filter,
durable);
+ }
final Queue queue = queueFactory.createQueue(queueID,
address,
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2011-02-02
02:21:13 UTC (rev 10168)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2011-02-02
03:36:20 UTC (rev 10169)
@@ -40,6 +40,7 @@
import org.hornetq.core.security.Role;
import org.hornetq.core.server.ActivateCallback;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.impl.HornetQServerImpl;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.transaction.ResourceManager;
import org.hornetq.core.transaction.TransactionDetail;
@@ -89,7 +90,7 @@
{
private static final Logger log = Logger.getLogger(JMSServerManagerImpl.class);
- private static final String REJECT_FILTER = "__HQX=-1";
+ private static final String REJECT_FILTER = HornetQServerImpl.GENERIC_IGNORED_FILTER;
private BindingRegistry registry;
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-02-02
02:21:13 UTC (rev 10168)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-02-02
03:36:20 UTC (rev 10169)
@@ -49,6 +49,7 @@
import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.impl.HornetQServerImpl;
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.tests.util.ServiceTestBase;
@@ -110,16 +111,16 @@
super.tearDown();
}
-
+
public void testPreparePersistent() throws Exception
{
boolean persistentMessages = true;
-
+
System.out.println("PageDir:" + getPageDir());
clearData();
Configuration config = createDefaultConfig();
-
+
config.setJournalSyncNonTransactional(false);
HornetQServer server = createServer(true,
@@ -129,7 +130,7 @@
new HashMap<String, AddressSettings>());
server.start();
-
+
final int messageSize = 1024;
final int numberOfMessages = 10000;
@@ -180,7 +181,7 @@
session.commit();
session.close();
session = null;
-
+
sf.close();
locator.close();
@@ -195,16 +196,15 @@
locator = createInVMNonHALocator();
sf = locator.createSessionFactory();
-
+
Queue queue = server.locateQueue(ADDRESS);
-
+
assertEquals(numberOfMessages, queue.getMessageCount());
-
LinkedList<Xid> xids = new LinkedList<Xid>();
-
+
int msgReceived = 0;
- for (int i = 0 ; i < numberOfMessages / 999; i++)
+ for (int i = 0; i < numberOfMessages / 999; i++)
{
ClientSession sessionConsumer = sf.createSession(true, false, false);
Xid xid = newXID();
@@ -212,7 +212,7 @@
sessionConsumer.start(xid, XAResource.TMNOFLAGS);
sessionConsumer.start();
ClientConsumer consumer =
sessionConsumer.createConsumer(PagingTest.ADDRESS);
- for (int msgCount = 0 ; msgCount < 1000; i++)
+ for (int msgCount = 0; msgCount < 1000; i++)
{
if (msgReceived == numberOfMessages)
{
@@ -227,18 +227,17 @@
sessionConsumer.prepare(xid);
sessionConsumer.close();
}
-
-
+
ClientSession sessionCheck = sf.createSession(true, true);
-
+
ClientConsumer consumer = sessionCheck.createConsumer(PagingTest.ADDRESS);
-
+
assertNull(consumer.receiveImmediate());
sessionCheck.close();
-
+
System.out.println(queue.getMessagesAdded());
-
+
assertEquals(numberOfMessages, queue.getMessageCount());
sf.close();
@@ -263,26 +262,26 @@
consumer = session.createConsumer(PagingTest.ADDRESS);
session.start();
-
+
assertNull(consumer.receiveImmediate());
-
+
for (Xid xid : xids)
{
session.rollback(xid);
}
-
+
xids.clear();
-
+
assertNotNull(consumer.receiveImmediate());
session.close();
-
+
sf.close();
-
+
locator.close();
-
+
queue.getMessageCount();
- //assertEquals(numberOfMessages, queue.getMessageCount());
+ // assertEquals(numberOfMessages, queue.getMessageCount());
}
finally
{
@@ -297,7 +296,121 @@
}
+ public void testTwoQueuesOneNoRouting() throws Exception
+ {
+ boolean persistentMessages = true;
+ clearData();
+
+ Configuration config = createDefaultConfig();
+
+ config.setJournalSyncNonTransactional(false);
+
+ HornetQServer server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ server.start();
+
+ final int messageSize = 1024;
+
+ final int numberOfMessages = 1000;
+
+ try
+ {
+ ServerLocator locator = createInVMNonHALocator();
+
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+ session.createQueue(PagingTest.ADDRESS,
PagingTest.ADDRESS.concat("-invalid"), new
SimpleString(HornetQServerImpl.GENERIC_IGNORED_FILTER), true);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ ClientMessage message = null;
+
+ byte[] body = new byte[messageSize];
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = session.createMessage(persistentMessages);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty(new SimpleString("id"), i);
+
+ producer.send(message);
+ if (i % 1000 == 0)
+ {
+ session.commit();
+ }
+ }
+
+ session.commit();
+
+ session.start();
+
+ ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = consumer.receive(5000);
+ assertNotNull(message);
+ message.acknowledge();
+
+ assertEquals(i, message.getIntProperty("id").intValue());
+ if (i % 1000 == 0)
+ {
+ session.commit();
+ }
+ }
+
+ session.commit();
+
+ session.commit();
+
+ session.commit();
+
+ PagingStore store = server.getPagingManager().getPageStore(ADDRESS);
+ store.getCursorProvier().cleanup();
+
+ long timeout = System.currentTimeMillis() + 5000;
+ while (store.isPaging() && timeout > System.currentTimeMillis())
+ {
+ Thread.sleep(100);
+ }
+
+ // It's async, so need to wait a bit for it happening
+ assertFalse(server.getPagingManager().getPageStore(ADDRESS).isPaging());
+
+ sf.close();
+
+ locator.close();
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ // System.exit(-1);
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
public void testSendReceivePagingPersistent() throws Exception
{
internaltestSendReceivePaging(true);
@@ -312,18 +425,18 @@
{
internalMultiQueuesTest(true);
}
-
+
public void testWithMultiQueues() throws Exception
{
internalMultiQueuesTest(false);
}
-
+
public void internalMultiQueuesTest(final boolean divert) throws Exception
{
clearData();
Configuration config = createDefaultConfig();
-
+
config.setJournalSyncNonTransactional(false);
HornetQServer server = createServer(true,
@@ -333,7 +446,7 @@
new HashMap<String, AddressSettings>());
if (divert)
- {
+ {
DivertConfiguration divert1 = new DivertConfiguration("dv1",
"nm1",
PagingTest.ADDRESS.toString(),
@@ -341,7 +454,7 @@
true,
null,
null);
-
+
DivertConfiguration divert2 = new DivertConfiguration("dv2",
"nm2",
PagingTest.ADDRESS.toString(),
@@ -349,11 +462,11 @@
true,
null,
null);
-
+
ArrayList<DivertConfiguration> divertList = new
ArrayList<DivertConfiguration>();
divertList.add(divert1);
divertList.add(divert2);
-
+
config.setDivertConfigurations(divertList);
}
@@ -424,7 +537,7 @@
session.close();
server.stop();
-
+
sf.close();
locator.close();
}
@@ -472,7 +585,8 @@
Assert.assertNotNull(message2);
- if (i % 1000 == 0) session.commit();
+ if (i % 1000 == 0)
+ session.commit();
try
{
@@ -487,7 +601,7 @@
throw e;
}
}
-
+
session.commit();
consumer.close();
@@ -514,13 +628,12 @@
threads[i].join();
}
-
sf2.close();
locator.close();
assertEquals(0, errors.get());
-
- for (int i = 0 ; i < 20 &&
server.getPostOffice().getPagingManager().getTransactions().size() != 0; i++)
+
+ for (int i = 0; i < 20 &&
server.getPostOffice().getPagingManager().getTransactions().size() != 0; i++)
{
if (server.getPostOffice().getPagingManager().getTransactions().size() != 0)
{
@@ -547,12 +660,12 @@
private void internaltestSendReceivePaging(final boolean persistentMessages) throws
Exception
{
-
+
System.out.println("PageDir:" + getPageDir());
clearData();
Configuration config = createDefaultConfig();
-
+
config.setJournalSyncNonTransactional(false);
HornetQServer server = createServer(true,
@@ -644,7 +757,8 @@
Assert.assertNotNull(message2);
- if (i % 1000 == 0) session.commit();
+ if (i % 1000 == 0)
+ session.commit();
try
{
@@ -663,9 +777,9 @@
consumer.close();
session.close();
-
+
sf.close();
-
+
locator.close();
}
finally
@@ -689,7 +803,7 @@
UnitTestCase.assertEqualsByteArrays(body, other);
}
-
+
/**
* - Make a destination in page mode
* - Add stuff to a transaction
@@ -813,9 +927,9 @@
consumer.close();
session.close();
-
+
sf.close();
-
+
locator.close();
}
finally
@@ -925,12 +1039,14 @@
assertFalse(msg.getBooleanProperty("new"));
Assert.assertNotNull(msg);
}
-
+
ClientMessage msgReceived = consumer.receiveImmediate();
-
+
if (msgReceived != null)
{
- System.out.println("new = " +
msgReceived.getBooleanProperty("new") + " id = " +
msgReceived.getIntProperty("id"));
+ System.out.println("new = " +
msgReceived.getBooleanProperty("new") +
+ " id = " +
+ msgReceived.getIntProperty("id"));
}
Assert.assertNull(msgReceived);
@@ -972,9 +1088,9 @@
consumer.close();
session.close();
-
+
sf.close();
-
+
locator.close();
}
finally
@@ -1013,7 +1129,6 @@
locator.setBlockOnDurableSend(true);
locator.setBlockOnAcknowledge(true);
-
ClientSessionFactory sf = locator.createSessionFactory();
byte[] body = new byte[messageSize];
@@ -1034,7 +1149,7 @@
ClientMessage message = sessionNonTX.createMessage(true);
message.getBodyBuffer().writeBytes(body);
message.putIntProperty(new SimpleString("id"), i);
- message.putStringProperty(new SimpleString("tst"), new
SimpleString("i=" + i));
+ message.putStringProperty(new SimpleString("tst"), new
SimpleString("i=" + i));
producerTransacted.send(message);
@@ -1044,7 +1159,7 @@
for (int j = 0; j < 20; j++)
{
ClientMessage msgSend = sessionNonTX.createMessage(true);
- msgSend.putStringProperty(new SimpleString("tst"), new
SimpleString("i=" + i + ", j=" + j));
+ msgSend.putStringProperty(new SimpleString("tst"), new
SimpleString("i=" + i + ", j=" + j));
msgSend.getBodyBuffer().writeBytes(new byte[10 * 1024]);
producerNonTransacted.send(msgSend);
}
@@ -1106,9 +1221,9 @@
consumer.close();
sessionNonTX.close();
-
+
sf.close();
-
+
locator.close();
}
finally
@@ -1154,7 +1269,7 @@
try
{
-
+
final ClientSessionFactory sf = locator.createSessionFactory();
final byte[] body = new byte[messageSize];
@@ -1220,14 +1335,14 @@
ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
for (int i = 0; i < numberOfMessages; i++)
- {
+ {
ClientMessage msg = consumer.receive(5000);
assertNotNull(msg);
assertEquals(i, msg.getIntProperty("count").intValue());
msg.acknowledge();
if (i > 0 && i % 10 == 0)
{
- session.commit();
+ session.commit();
}
}
session.commit();
@@ -1235,9 +1350,9 @@
session.close();
producerThread.join();
-
+
locator.close();
-
+
sf.close();
assertEquals(0, errors.get());
@@ -1396,7 +1511,7 @@
clearData();
Configuration config = createDefaultConfig();
-
+
config.setJournalSyncNonTransactional(false);
HornetQServer server = createServer(true,
@@ -1650,28 +1765,27 @@
}
session.commit();
-
+
session.close();
-
+
locator.close();
-
+
locator = createInVMNonHALocator();
-
+
server.stop();
-
-
+
server = createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
new HashMap<String, AddressSettings>());
-
+
server.start();
sf = locator.createSessionFactory();
-
- session = sf.createSession(null, null, false, false, false, false, 0);
+ session = sf.createSession(null, null, false, false, false, false, 0);
+
ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
session.start();
@@ -1745,27 +1859,27 @@
}
session.commit();
-
+
session.close();
-
+
locator.close();
-
+
server.stop();
-
+
server = createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
new HashMap<String, AddressSettings>());
-
+
server.start();
-
+
locator = createInVMNonHALocator();
sf = locator.createSessionFactory();
-
- session = sf.createSession(null, null, false, false, false, false, 0);
+ session = sf.createSession(null, null, false, false, false, false, 0);
+
ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
session.start();
@@ -1781,25 +1895,25 @@
}
session.close();
-
+
locator.close();
-
+
server.stop();
-
+
server = createServer(true,
config,
PagingTest.PAGE_SIZE,
PagingTest.PAGE_MAX,
new HashMap<String, AddressSettings>());
-
+
server.start();
-
+
locator = createInVMNonHALocator();
sf = locator.createSessionFactory();
-
- session = sf.createSession(null, null, false, false, false, false, 0);
+ session = sf.createSession(null, null, false, false, false, false, 0);
+
consumer = session.createConsumer(PagingTest.ADDRESS);
session.start();
@@ -1979,7 +2093,7 @@
}
}
-
+
public void testDropMessagesExpiring() throws Exception
{
clearData();
@@ -2189,7 +2303,7 @@
}
}
-
+
public void testSyncPage() throws Exception
{
Configuration config = createDefaultConfig();
@@ -2205,74 +2319,73 @@
try
{
server.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true, false);
-
+
final CountDownLatch pageUp = new CountDownLatch(0);
final CountDownLatch pageDone = new CountDownLatch(1);
-
+
OperationContext ctx = new OperationContext()
{
-
+
public void onError(int errorCode, String errorMessage)
{
}
-
+
public void done()
{
}
-
+
public void storeLineUp()
{
}
-
+
public boolean waitCompletion(long timeout) throws Exception
{
return false;
}
-
+
public void waitCompletion() throws Exception
{
-
+
}
-
+
public void replicationLineUp()
{
-
+
}
-
+
public void replicationDone()
{
-
+
}
-
+
public void pageSyncLineUp()
{
pageUp.countDown();
}
-
+
public void pageSyncDone()
{
pageDone.countDown();
}
-
+
public void executeOnCompletion(IOAsyncTask runnable)
{
-
+
}
};
-
OperationContextImpl.setContext(ctx);
-
+
PagingManager paging = server.getPagingManager();
-
+
PagingStore store = paging.getPageStore(ADDRESS);
-
+
store.sync();
-
+
assertTrue(pageUp.await(10, TimeUnit.SECONDS));
-
+
assertTrue(pageDone.await(10, TimeUnit.SECONDS));
-
+
server.stop();
}
@@ -2289,7 +2402,6 @@
}
-
public void testSyncPageTX() throws Exception
{
Configuration config = createDefaultConfig();
@@ -2305,74 +2417,73 @@
try
{
server.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true, false);
-
+
final CountDownLatch pageUp = new CountDownLatch(0);
final CountDownLatch pageDone = new CountDownLatch(1);
-
+
OperationContext ctx = new OperationContext()
{
-
+
public void onError(int errorCode, String errorMessage)
{
}
-
+
public void done()
{
}
-
+
public void storeLineUp()
{
}
-
+
public boolean waitCompletion(long timeout) throws Exception
{
return false;
}
-
+
public void waitCompletion() throws Exception
{
-
+
}
-
+
public void replicationLineUp()
{
-
+
}
-
+
public void replicationDone()
{
-
+
}
-
+
public void pageSyncLineUp()
{
pageUp.countDown();
}
-
+
public void pageSyncDone()
{
pageDone.countDown();
}
-
+
public void executeOnCompletion(IOAsyncTask runnable)
{
-
+
}
};
-
OperationContextImpl.setContext(ctx);
-
+
PagingManager paging = server.getPagingManager();
-
+
PagingStore store = paging.getPageStore(ADDRESS);
-
+
store.sync();
-
+
assertTrue(pageUp.await(10, TimeUnit.SECONDS));
-
+
assertTrue(pageDone.await(10, TimeUnit.SECONDS));
-
+
server.stop();
}
@@ -2389,7 +2500,6 @@
}
-
public void testPagingOneDestinationOnly() throws Exception
{
SimpleString PAGED_ADDRESS = new SimpleString("paged");
@@ -2464,7 +2574,7 @@
}
consumerNonPaged.close();
-
+
session.commit();
ackList = null;