[hornetq-commits] JBoss hornetq SVN: r11920 - in branches/Branch_2_2_EAP: tests/src/org/hornetq/tests/integration/client and 1 other directory.
do-not-reply at jboss.org
do-not-reply at jboss.org
Tue Dec 20 21:47:10 EST 2011
Author: clebert.suconic at jboss.com
Date: 2011-12-20 21:47:09 -0500 (Tue, 20 Dec 2011)
New Revision: 11920
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-7161 - improving deletion of messages after duplicate detection
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-12-21 02:13:14 UTC (rev 11919)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-12-21 02:47:09 UTC (rev 11920)
@@ -1181,6 +1181,8 @@
{
context.getTransaction().markAsRollbackOnly(new HornetQException(HornetQException.DUPLICATE_ID_REJECTED, warnMessage.toString()));
}
+
+ message.decrementRefCount();
return false;
}
@@ -1196,6 +1198,9 @@
cacheBridge.addToCache(bridgeDupBytes, context.getTransaction());
message.removeProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID);
+
+ message.decrementRefCount();
+
}
byte[] duplicateIDBytes = message.getDuplicateIDBytes();
@@ -1219,6 +1224,8 @@
{
context.getTransaction().markAsRollbackOnly(new HornetQException(HornetQException.DUPLICATE_ID_REJECTED, warnMessage));
}
+
+ message.decrementRefCount();
return false;
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2011-12-21 02:13:14 UTC (rev 11919)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/LargeMessageTest.java 2011-12-21 02:47:09 UTC (rev 11920)
@@ -30,6 +30,7 @@
import org.hornetq.core.client.impl.ClientConsumerInternal;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.persistence.impl.journal.LargeServerMessageImpl;
import org.hornetq.core.server.HornetQServer;
@@ -73,7 +74,7 @@
{
return false;
}
-
+
/**
*
*/
@@ -92,34 +93,32 @@
public void testRollbackPartiallyConsumedBuffer() throws Exception
{
- for (int i = 0 ; i < 1; i++)
+ for (int i = 0; i < 1; i++)
{
log.info("#test " + i);
internalTestRollbackPartiallyConsumedBuffer(false);
tearDown();
setUp();
-
+
}
-
+
}
-
+
public void testRollbackPartiallyConsumedBufferWithRedeliveryDelay() throws Exception
{
internalTestRollbackPartiallyConsumedBuffer(true);
}
-
-
+
private void internalTestRollbackPartiallyConsumedBuffer(final boolean redeliveryDelay) throws Exception
{
final int messageSize = 100 * 1024;
-
final ClientSession session;
try
{
server = createServer(true, isNetty());
-
+
AddressSettings settings = new AddressSettings();
if (redeliveryDelay)
{
@@ -130,7 +129,7 @@
}
}
settings.setMaxDeliveryAttempts(-1);
-
+
server.getAddressSettingsRepository().addMatch("#", settings);
server.start();
@@ -143,35 +142,36 @@
ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
- for (int i = 0 ; i < 20; i++)
+ for (int i = 0; i < 20; i++)
{
Message clientFile = createLargeClientMessage(session, messageSize, true);
-
+
clientFile.putIntProperty("value", i);
-
+
producer.send(clientFile);
}
session.commit();
session.start();
-
+
final CountDownLatch latch = new CountDownLatch(1);
-
+
final AtomicInteger errors = new AtomicInteger(0);
ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS);
-
+
consumer.setMessageHandler(new MessageHandler()
{
int counter = 0;
+
public void onMessage(ClientMessage message)
{
message.getBodyBuffer().readByte();
System.out.println("message:" + message);
try
{
- if (counter ++ < 20)
+ if (counter++ < 20)
{
Thread.sleep(100);
System.out.println("Rollback");
@@ -183,7 +183,7 @@
message.acknowledge();
session.commit();
}
-
+
if (counter == 40)
{
latch.countDown();
@@ -197,7 +197,7 @@
}
}
});
-
+
assertTrue(latch.await(40, TimeUnit.SECONDS));
consumer.close();
@@ -975,6 +975,74 @@
}
}
+ public void testSentWithDuplicateID() throws Exception
+ {
+ final int messageSize = 3 * HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+
+ ClientSession session = null;
+
+ try
+ {
+ server = createServer(true, isNetty());
+
+ server.start();
+
+ ClientSessionFactory sf = locator.createSessionFactory();
+
+ session = sf.createSession(true, true, 0);
+
+ session.createQueue(LargeMessageTest.ADDRESS, LargeMessageTest.ADDRESS, true);
+
+ ClientProducer producer = session.createProducer(LargeMessageTest.ADDRESS);
+
+ String someDuplicateInfo = "Anything";
+
+ for (int i = 0; i < 10; i++)
+ {
+ Message clientFile = createLargeClientMessage(session, messageSize, true);
+
+ clientFile.putBytesProperty(MessageImpl.HDR_BRIDGE_DUPLICATE_ID, someDuplicateInfo.getBytes());
+
+ producer.send(clientFile);
+ }
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ session.start();
+
+ ClientMessage msg = consumer.receive(10000);
+
+ assertNotNull(msg);
+
+ msg.acknowledge();
+
+ assertNull(consumer.receiveImmediate());
+
+ session.commit();
+
+ validateNoFilesOnLargeDir();
+
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+
+ try
+ {
+ session.close();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+ }
+
public void testResendSmallStreamMessage() throws Exception
{
internalTestResendMessage(50000);
@@ -1729,7 +1797,6 @@
100);
}
-
public void testPageOnLargeMessage() throws Exception
{
testPageOnLargeMessage(true, false);
@@ -2600,7 +2667,7 @@
}
}
}
-
+
// JBPAPP-6237
public void testPageOnLargeMessageMultipleQueues() throws Exception
{
@@ -2754,7 +2821,6 @@
}
-
// JBPAPP-6237
public void testPageOnLargeMessageMultipleQueues2() throws Exception
{
@@ -2796,7 +2862,7 @@
for (int i = 0; i < 100; i++)
{
ClientMessage message = session.createMessage(true);
-
+
message.putIntProperty("msgID", msgId++);
message.putBooleanProperty("TestLarge", false);
@@ -2813,7 +2879,6 @@
producer.send(message);
}
-
for (int i = 0; i < 10; i++)
{
ClientMessage clientFile = createLargeClientMessage(session, numberOfBytesBigMessage);
@@ -2830,34 +2895,34 @@
ClientConsumer consumer = session.createConsumer(LargeMessageTest.ADDRESS.concat("-" + ad));
session.start();
-
- for (int received = 0 ; received < 5; received++)
+
+ for (int received = 0; received < 5; received++)
{
for (int i = 0; i < 100; i++)
{
ClientMessage message2 = consumer.receive(LargeMessageTest.RECEIVE_WAIT_TIME);
-
+
Assert.assertNotNull(message2);
-
+
assertFalse(message2.getBooleanProperty("TestLarge"));
-
+
message2.acknowledge();
-
+
Assert.assertNotNull(message2);
}
-
+
for (int i = 0; i < 10; i++)
{
ClientMessage messageLarge = consumer.receive(RECEIVE_WAIT_TIME);
-
+
Assert.assertNotNull(messageLarge);
-
+
assertTrue(messageLarge.getBooleanProperty("TestLarge"));
-
+
ByteArrayOutputStream bout = new ByteArrayOutputStream();
-
+
messageLarge.acknowledge();
-
+
messageLarge.saveToOutputStream(bout);
byte[] body = bout.toByteArray();
assertEquals(numberOfBytesBigMessage, body.length);
@@ -2866,7 +2931,7 @@
assertEquals(getSamplebyte(bi), body[bi]);
}
}
-
+
session.rollback();
}
More information about the hornetq-commits
mailing list