Author: borges
Date: 2012-02-06 09:29:24 -0500 (Mon, 06 Feb 2012)
New Revision: 12086
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageInSync.java
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java
Log:
HORNETQ-720 Fix sync of largeMessages that are being uploaded when sync starts.
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageInSync.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageInSync.java 2012-02-06
11:46:58 UTC (rev 12085)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageInSync.java 2012-02-06
14:29:24 UTC (rev 12086)
@@ -36,12 +36,15 @@
SequentialFile mainSeqFile = mainLM.getFile();
if (appendFile != null)
{
+ appendFile.close();
+ appendFile.open();
for (;;)
{
buffer.rewind();
- int size = appendFile.read(buffer);
+ int bytesRead = appendFile.read(buffer);
+ if (bytesRead > 0)
mainSeqFile.writeInternal(buffer);
- if (size < buffer.capacity())
+ if (bytesRead < buffer.capacity())
{
break;
}
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2012-02-06
11:46:58 UTC (rev 12085)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2012-02-06
14:29:24 UTC (rev 12086)
@@ -548,7 +548,7 @@
while (true)
{
buffer.clear();
- int bytesRead = channel.read(buffer);
+ final int bytesRead = channel.read(buffer);
int toSend = bytesRead;
if (bytesRead > 0)
{
@@ -566,7 +566,7 @@
buffer.rewind();
// sending -1 or 0 bytes will close the file at the backup
- sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id,
bytesRead, buffer));
+ sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id,
toSend, buffer));
if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0)
break;
}
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java 2012-02-06
11:46:58 UTC (rev 12085)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java 2012-02-06
14:29:24 UTC (rev 12086)
@@ -103,7 +103,7 @@
{
session.start();
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
- ClientMessage msg = consumer.receive(200);
+ ClientMessage msg = consumer.receiveImmediate();
assertNull("there should be no more messages to receive! " + msg, msg);
consumer.close();
session.commit();
Modified:
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java
===================================================================
---
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java 2012-02-06
11:46:58 UTC (rev 12085)
+++
trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java 2012-02-06
14:29:24 UTC (rev 12086)
@@ -7,7 +7,11 @@
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
+import junit.framework.Assert;
+
+import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.client.ClientConsumer;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientProducer;
import org.hornetq.api.core.client.ClientSession;
@@ -81,7 +85,8 @@
session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null,
true);
final ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
final ClientMessage message = session.createMessage(true);
- message.setBodyInputStream(UnitTestCase.createFakeLargeStream(1000 *
MIN_LARGE_MESSAGE));
+ final int largeMessageSize = 1000 * MIN_LARGE_MESSAGE;
+ message.setBodyInputStream(UnitTestCase.createFakeLargeStream(largeMessageSize));
final AtomicBoolean caughtException = new AtomicBoolean(false);
final CountDownLatch latch = new CountDownLatch(1);
@@ -115,6 +120,20 @@
latch2.await();
crash(session);
assertFalse("no exceptions while sending message",
caughtException.get());
+
+ session.start();
+ ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+ ClientMessage msg = consumer.receive(2000);
+ HornetQBuffer buffer = msg.getBodyBuffer();
+
+ for (int j = 0; j < largeMessageSize; j++)
+ {
+ Assert.assertTrue("large msg , expecting " + largeMessageSize + "
bytes, got " + j, buffer.readable());
+ Assert.assertEquals("equal at " + j, UnitTestCase.getSamplebyte(j),
buffer.readByte());
+ }
+ assertNull("there should be no more messages!",
consumer.receiveImmediate());
+ consumer.close();
+ session.commit();
}
private Set<Long> getAllMessageFileIds(File dir)
Show replies by date