[hornetq-commits] JBoss hornetq SVN: r12086 - in trunk: hornetq-core/src/main/java/org/hornetq/core/replication/impl and 1 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Feb 6 09:29:26 EST 2012


Author: borges
Date: 2012-02-06 09:29:24 -0500 (Mon, 06 Feb 2012)
New Revision: 12086

Modified:
   trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageInSync.java
   trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
   trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java
   trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java
Log:
HORNETQ-720 Fix sync of largeMessages that are being uploaded when sync starts.

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageInSync.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageInSync.java	2012-02-06 11:46:58 UTC (rev 12085)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/LargeServerMessageInSync.java	2012-02-06 14:29:24 UTC (rev 12086)
@@ -36,12 +36,15 @@
       SequentialFile mainSeqFile = mainLM.getFile();
       if (appendFile != null)
       {
+         appendFile.close();
+         appendFile.open();
          for (;;)
          {
             buffer.rewind();
-            int size = appendFile.read(buffer);
+            int bytesRead = appendFile.read(buffer);
+            if (bytesRead > 0)
             mainSeqFile.writeInternal(buffer);
-            if (size < buffer.capacity())
+            if (bytesRead < buffer.capacity())
             {
                break;
             }

Modified: trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2012-02-06 11:46:58 UTC (rev 12085)
+++ trunk/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2012-02-06 14:29:24 UTC (rev 12086)
@@ -548,7 +548,7 @@
          while (true)
          {
             buffer.clear();
-            int bytesRead = channel.read(buffer);
+            final int bytesRead = channel.read(buffer);
             int toSend = bytesRead;
             if (bytesRead > 0)
             {
@@ -566,7 +566,7 @@
             buffer.rewind();
 
             // sending -1 or 0 bytes will close the file at the backup
-            sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, bytesRead, buffer));
+            sendReplicatePacket(new ReplicationSyncFileMessage(content, pageStore, id, toSend, buffer));
             if (bytesRead == -1 || bytesRead == 0 || maxBytesToSend == 0)
                break;
          }

Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java	2012-02-06 11:46:58 UTC (rev 12085)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncJournalTest.java	2012-02-06 14:29:24 UTC (rev 12086)
@@ -103,7 +103,7 @@
    {
       session.start();
       ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
-      ClientMessage msg = consumer.receive(200);
+      ClientMessage msg = consumer.receiveImmediate();
       assertNull("there should be no more messages to receive! " + msg, msg);
       consumer.close();
       session.commit();

Modified: trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java
===================================================================
--- trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java	2012-02-06 11:46:58 UTC (rev 12085)
+++ trunk/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupSyncLargeMessageTest.java	2012-02-06 14:29:24 UTC (rev 12086)
@@ -7,7 +7,11 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import junit.framework.Assert;
+
+import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.api.core.HornetQException;
+import org.hornetq.api.core.client.ClientConsumer;
 import org.hornetq.api.core.client.ClientMessage;
 import org.hornetq.api.core.client.ClientProducer;
 import org.hornetq.api.core.client.ClientSession;
@@ -81,7 +85,8 @@
       session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
       final ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
       final ClientMessage message = session.createMessage(true);
-      message.setBodyInputStream(UnitTestCase.createFakeLargeStream(1000 * MIN_LARGE_MESSAGE));
+      final int largeMessageSize = 1000 * MIN_LARGE_MESSAGE;
+      message.setBodyInputStream(UnitTestCase.createFakeLargeStream(largeMessageSize));
 
       final AtomicBoolean caughtException = new AtomicBoolean(false);
       final CountDownLatch latch = new CountDownLatch(1);
@@ -115,6 +120,20 @@
       latch2.await();
       crash(session);
       assertFalse("no exceptions while sending message", caughtException.get());
+
+      session.start();
+      ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+      ClientMessage msg = consumer.receive(2000);
+      HornetQBuffer buffer = msg.getBodyBuffer();
+
+      for (int j = 0; j < largeMessageSize; j++)
+      {
+         Assert.assertTrue("large msg , expecting " + largeMessageSize + " bytes, got " + j, buffer.readable());
+         Assert.assertEquals("equal at " + j, UnitTestCase.getSamplebyte(j), buffer.readByte());
+      }
+      assertNull("there should be no more messages!", consumer.receiveImmediate());
+      consumer.close();
+      session.commit();
    }
 
    private Set<Long> getAllMessageFileIds(File dir)



More information about the hornetq-commits mailing list