Author: clebert.suconic(a)jboss.com
Date: 2011-04-07 18:04:34 -0400 (Thu, 07 Apr 2011)
New Revision: 10463
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/scheduling/ScheduledMessageTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-6273 - Scheduled Messages Issue
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-04-07
18:52:30 UTC (rev 10462)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-04-07
22:04:34 UTC (rev 10463)
@@ -1144,10 +1144,18 @@
}
Collection<AddMessageRecord> valueRecords = queueRecords.values();
+
+ long currentTime = System.currentTimeMillis();
for (AddMessageRecord record : valueRecords)
{
long scheduledDeliveryTime = record.scheduledDeliveryTime;
+
+ if (scheduledDeliveryTime != 0 && scheduledDeliveryTime <=
currentTime)
+ {
+ scheduledDeliveryTime = 0;
+ record.message.removeProperty(Message.HDR_SCHEDULED_DELIVERY_TIME);
+ }
if (scheduledDeliveryTime != 0)
{
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java 2011-04-07
18:52:30 UTC (rev 10462)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/cluster/distribution/MessageRedistributionTest.java 2011-04-07
22:04:34 UTC (rev 10463)
@@ -13,7 +13,18 @@
package org.hornetq.tests.integration.cluster.distribution;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.server.cluster.ClusterConnection;
import org.hornetq.core.server.cluster.MessageFlowRecord;
import org.hornetq.core.server.cluster.impl.ClusterConnectionImpl;
@@ -333,6 +344,118 @@
verifyReceiveAll(20, 1);
}
+ public void testRedistributeWithScheduling() throws Exception
+ {
+ setupCluster(false);
+
+ AddressSettings setting = new AddressSettings();
+ setting.setRedeliveryDelay(10000);
+ servers[0].getAddressSettingsRepository().addMatch("queues.testaddress",
setting);
+ servers[0].getAddressSettingsRepository().addMatch("queue0", setting);
+ servers[1].getAddressSettingsRepository().addMatch("queue0", setting);
+ servers[1].getAddressSettingsRepository().addMatch("queues.testaddress",
setting);
+
+ startServers(0);
+
+ setupSessionFactory(0, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, false);
+
+ ClientSession session0 = sfs[0].createSession(false, false, false);
+
+ ClientProducer prod0 = session0.createProducer("queues.testaddress");
+
+ for (int i = 0 ; i < 100; i++)
+ {
+ ClientMessage msg = session0.createMessage(true);
+ msg.putIntProperty("key", i);
+
+ byte[] bytes = new byte[24];
+
+ ByteBuffer bb = ByteBuffer.wrap(bytes);
+
+ bb.putLong((long)i);
+
+ msg.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID, bytes);
+
+ prod0.send(msg);
+
+ session0.commit();
+ }
+
+ session0.close();
+
+ session0 = sfs[0].createSession(true, false, false);
+
+ ClientConsumer consumer0 = session0.createConsumer("queue0");
+
+ session0.start();
+
+ ArrayList<Xid> xids = new ArrayList<Xid>();
+
+ for (int i = 0 ; i < 100; i++)
+ {
+ Xid xid = newXID();
+
+ session0.start(xid, XAResource.TMNOFLAGS);
+
+ ClientMessage msg = consumer0.receive(5000);
+
+ msg.acknowledge();
+
+ session0.end(xid, XAResource.TMSUCCESS);
+
+ session0.prepare(xid);
+
+ xids.add(xid);
+ }
+
+ session0.close();
+
+ sfs[0].close();
+ sfs[0] = null;
+
+
+ startServers(0, 1, 2);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+
+ createQueue(1, "queues.testaddress", "queue0", null, false);
+ createQueue(2, "queues.testaddress", "queue0", null, false);
+
+ ClientSession session1 = sfs[1].createSession(false, false);
+ session1.start();
+ ClientConsumer consumer1 = session1.createConsumer("queue0");
+
+ waitForBindings(0, "queues.testaddress", 1, 0, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+ waitForBindings(2, "queues.testaddress", 1, 0, true);
+
+ waitForBindings(0, "queues.testaddress", 2, 1, false);
+ waitForBindings(1, "queues.testaddress", 2, 0, false);
+ waitForBindings(2, "queues.testaddress", 2, 1, false);
+
+ session0 = sfs[0].createSession(true, false, false);
+
+ for (Xid xid: xids)
+ {
+ session0.rollback(xid);
+ }
+
+
+ for (int i = 0 ; i < 100; i++)
+ {
+ ClientMessage msg = consumer1.receive(15000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ }
+
+ session1.commit();
+
+ }
+
public void testRedistributionWhenConsumerIsClosedQueuesWithFilters() throws
Exception
{
setupCluster(false);
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/scheduling/ScheduledMessageTest.java
===================================================================
---
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/scheduling/ScheduledMessageTest.java 2011-04-07
18:52:30 UTC (rev 10462)
+++
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/scheduling/ScheduledMessageTest.java 2011-04-07
22:04:34 UTC (rev 10463)
@@ -12,6 +12,9 @@
*/
package org.hornetq.tests.integration.scheduling;
+import java.util.ArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
@@ -32,6 +35,7 @@
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * @author <a href="mailto:clebert.suconic">Clebert Suconic</a>
*/
public class ScheduledMessageTest extends ServiceTestBase
{
@@ -52,6 +56,14 @@
{
super.setUp();
clearData();
+ startServer();
+ }
+
+ /**
+ * @throws Exception
+ */
+ protected void startServer() throws Exception
+ {
configuration = createDefaultConfig();
configuration.setSecurityEnabled(false);
configuration.setJournalMinFiles(2);
@@ -64,7 +76,7 @@
protected void tearDown() throws Exception
{
locator.close();
-
+
if (server != null)
{
try
@@ -282,10 +294,10 @@
session.createQueue(atestq, atestq, null, true);
ClientProducer producer = session.createProducer(atestq);
ClientMessage message = session.createMessage(HornetQTextMessage.TYPE,
- false,
- 0,
- System.currentTimeMillis(),
- (byte)1);
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
message.getBodyBuffer().writeString("testINVMCoreClient");
message.setDurable(true);
long time = System.currentTimeMillis();
@@ -484,6 +496,47 @@
session.close();
}
+
+ public void testManyMessagesSameTime() throws Exception
+ {
+
+ ClientSessionFactory sessionFactory = locator.createSessionFactory();
+ ClientSession session = sessionFactory.createSession(false, false, false);
+ session.createQueue(atestq, atestq, null, true);
+ ClientProducer producer = session.createProducer(atestq);
+ long time = System.currentTimeMillis();
+ time += 1000;
+
+ for (int i = 0; i < 1000; i++)
+ {
+ ClientMessage message = session.createMessage(true);
+ message.putIntProperty("value", i);
+ message.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, time);
+ producer.send(message);
+ }
+
+ session.commit();
+
+
+ session.start();
+ ClientConsumer consumer = session.createConsumer(atestq);
+
+ for (int i = 0 ; i < 1000; i++)
+ {
+ ClientMessage message = consumer.receive(15000);
+ assertNotNull(message);
+ message.acknowledge();
+
+ assertEquals(i, message.getIntProperty("value").intValue());
+ }
+
+ session.commit();
+
+ Assert.assertNull(consumer.receiveImmediate());
+
+ session.close();
+ }
+
public void testScheduledAndNormalMessagesDeliveredCorrectly(final boolean recover)
throws Exception
{
@@ -607,7 +660,76 @@
Assert.assertNull(consumer.receiveImmediate());
session.close();
}
+
+
+ public void testPendingACKOnPrepared() throws Exception
+ {
+
+ int NUMBER_OF_MESSAGES = 100;
+
+ ClientSessionFactory sessionFactory = locator.createSessionFactory();
+ ClientSession session = sessionFactory.createSession(true, false, false);
+ session.createQueue(atestq, atestq, null, true);
+ ClientProducer producer = session.createProducer(atestq);
+
+ long scheduled = System.currentTimeMillis() + 1000;
+ for (int i = 0 ; i < NUMBER_OF_MESSAGES; i++)
+ {
+ ClientMessage msg = session.createMessage(true);
+ msg.putIntProperty("value", i);
+ msg.putLongProperty(Message.HDR_SCHEDULED_DELIVERY_TIME, scheduled);
+ producer.send(msg);
+ }
+
+ session.close();
+
+
+ for (int i = 0 ; i < NUMBER_OF_MESSAGES; i++)
+ {
+ Xid xid = newXID();
+
+ session = sessionFactory.createSession(true, false, false);
+
+ ClientConsumer consumer = session.createConsumer(atestq);
+
+ session.start();
+
+ session.start(xid, XAResource.TMNOFLAGS);
+
+ ClientMessage msg = consumer.receive(5000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ session.end(xid, XAResource.TMSUCCESS);
+
+ session.prepare(xid);
+
+ session.close();
+ }
+
+ sessionFactory.close();
+ locator.close();
+
+ server.stop();
+
+ startServer();
+
+ sessionFactory = locator.createSessionFactory();
+
+ session = sessionFactory.createSession(false, false);
+
+ ClientConsumer consumer = session.createConsumer(atestq);
+
+ session.start();
+
+ assertNull(consumer.receive(1000));
+
+ session.close();
+
+ sessionFactory.close();
+
+ }
+
public void testScheduledDeliveryTX() throws Exception
{
scheduledDelivery(true);
@@ -618,6 +740,116 @@
scheduledDelivery(false);
}
+ public void testRedeliveryAfterPrepare() throws Exception
+ {
+ AddressSettings qs = new AddressSettings();
+ qs.setRedeliveryDelay(5000l);
+ server.getAddressSettingsRepository().addMatch(atestq.toString(), qs);
+
+ ClientSessionFactory sessionFactory = locator.createSessionFactory();
+ ClientSession session = sessionFactory.createSession(false, false, false);
+
+ session.createQueue(atestq, atestq, true);
+
+ ClientProducer producer = session.createProducer(atestq);
+ for (int i = 0; i < 100; i++)
+ {
+ ClientMessage msg = session.createMessage(true);
+ msg.putIntProperty("key", i);
+ producer.send(msg);
+ session.commit();
+ }
+
+ session.close();
+
+ session = sessionFactory.createSession(true, false, false);
+
+ ClientConsumer consumer = session.createConsumer(atestq);
+
+ ArrayList<Xid> xids = new ArrayList<Xid>();
+
+ session.start();
+
+ for (int i = 0; i < 100; i++)
+ {
+ Xid xid = newXID();
+ session.start(xid, XAResource.TMNOFLAGS);
+ ClientMessage msg = consumer.receive(5000);
+ assertNotNull(msg);
+ msg.acknowledge();
+ session.end(xid, XAResource.TMSUCCESS);
+ session.prepare(xid);
+ xids.add(xid);
+ }
+
+ session.rollback(xids.get(0));
+ xids.set(0, null);
+ session.close();
+
+ server.stop();
+
+ configuration = createDefaultConfig();
+ configuration.setSecurityEnabled(false);
+ configuration.setJournalMinFiles(2);
+ configuration.getAddressesSettings().put(atestq.toString(), qs);
+
+ server = createServer(true, configuration);
+ server.start();
+
+ locator = createInVMNonHALocator();
+
+ final AtomicInteger count = new AtomicInteger(0);
+ Thread t = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ ClientSessionFactory sf = locator.createSessionFactory();
+ ClientSession session = sf.createSession(false, false);
+ session.start();
+ ClientConsumer cons = session.createConsumer(atestq);
+ for (int i = 0; i < 100; i++)
+ {
+ ClientMessage msg = cons.receive(100000);
+ assertNotNull(msg);
+ System.out.println("Received message " + msg);
+ count.incrementAndGet();
+ msg.acknowledge();
+ session.commit();
+ }
+ session.close();
+ sf.close();
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ count.set(-1);
+ }
+ }
+ };
+
+ t.start();
+
+ sessionFactory = locator.createSessionFactory();
+
+ session = sessionFactory.createSession(true, false, false);
+
+ for (Xid xid : xids)
+ {
+ if (xid != null)
+ {
+ session.rollback(xid);
+ }
+ }
+
+ session.close();
+
+ t.join();
+
+ assertEquals(100, count.get());
+ }
+
// Private -------------------------------------------------------
private void scheduledDelivery(final boolean tx) throws Exception
@@ -770,10 +1002,10 @@
private ClientMessage createDurableMessage(final ClientSession session, final String
body)
{
ClientMessage message = session.createMessage(HornetQTextMessage.TYPE,
- true,
- 0,
- System.currentTimeMillis(),
- (byte)1);
+ true,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
message.getBodyBuffer().writeString(body);
return message;
}