[hornetq-commits] JBoss hornetq SVN: r11084 - in branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core: protocol/core/impl/wireformat and 2 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Aug 1 08:50:41 EDT 2011


Author: borges
Date: 2011-08-01 08:50:41 -0400 (Mon, 01 Aug 2011)
New Revision: 11084

Modified:
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
HORNETQ-720 Fix handling of read(buffer) returning 0.

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-08-01 12:49:46 UTC (rev 11083)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-08-01 12:50:41 UTC (rev 11084)
@@ -366,15 +366,21 @@
       final JournalImpl localMessageJournal = (JournalImpl)messageJournal;
       final JournalImpl localBindingsJournal = (JournalImpl)bindingsJournal;
 
+      JournalFile[] messageFiles;
+      JournalFile[] bindingsFiles;
+
       localMessageJournal.writeLock();
       localBindingsJournal.writeLock();
-
-      JournalFile[] messageFiles = prepareJournalForCopy(localMessageJournal, JournalContent.MESSAGES);
-      JournalFile[] bindingsFiles = prepareJournalForCopy(localBindingsJournal, JournalContent.BINDINGS);
-
-      localMessageJournal.writeUnlock();
-      localBindingsJournal.writeUnlock();
-
+      try
+      {
+         messageFiles = prepareJournalForCopy(localMessageJournal, JournalContent.MESSAGES);
+         bindingsFiles = prepareJournalForCopy(localBindingsJournal, JournalContent.BINDINGS);
+      }
+      finally
+      {
+         localMessageJournal.writeUnlock();
+         localBindingsJournal.writeUnlock();
+      }
       bindingsJournal = new ReplicatedJournal(((byte)0), localBindingsJournal, replicator);
       messageJournal = new ReplicatedJournal((byte)1, localMessageJournal, replicator);
 

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java	2011-08-01 12:49:46 UTC (rev 11083)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java	2011-08-01 12:50:41 UTC (rev 11084)
@@ -47,7 +47,7 @@
       buffer.writeByte(journalType.typeByte);
       buffer.writeInt(dataSize);
       // sending -1 will close the file
-      if (dataSize > -1)
+      if (dataSize > 0)
       {
          buffer.writeBytes(data);// (data, 0, dataSize);
       }
@@ -64,7 +64,7 @@
       }
       journalType = JournalContent.getType(buffer.readByte());
       int size = buffer.readInt();
-      if (size > -1)
+      if (size > 0)
       {
          byteArray = new byte[size];
          buffer.readBytes(byteArray);

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2011-08-01 12:49:46 UTC (rev 11083)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2011-08-01 12:50:41 UTC (rev 11084)
@@ -377,7 +377,7 @@
             JournalImpl journal = (JournalImpl)j;
             journal.finishRemoteBackupSync();
          }
-         server.setRemoteBackupUpToDate(true);
+         server.setRemoteBackupUpToDate();
          return;
       }
 

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2011-08-01 12:49:46 UTC (rev 11083)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2011-08-01 12:50:41 UTC (rev 11084)
@@ -508,25 +508,25 @@
    public void sendJournalFile(JournalFile jf, JournalContent content) throws Exception
    {
       SequentialFile file = jf.getFile().copy();
+      log.info("Replication: sending " + jf + " to backup. " + file);
       if (!file.isOpen())
       {
          file.open(1, false);
       }
       final long id = jf.getFileID();
       final ByteBuffer buffer = ByteBuffer.allocate(1 << 17);
-
       while (true)
       {
          int bytesRead = file.read(buffer);
-         if (bytesRead > -1)
+         if (bytesRead > 0)
             buffer.limit(bytesRead);
-         // sending -1 bytes will close the file at the backup
+         buffer.rewind();
+
+         // sending -1 or 0 bytes will close the file at the backup
          replicatingChannel.send(new ReplicationJournalFileMessage(bytesRead, buffer, content, id));
-         if (bytesRead == -1)
+         if (bytesRead == -1 || bytesRead == 0)
             break;
       }
-      // XXX probably need to sync the JournalFile(?)
-      throw new UnsupportedOperationException();
    }
 
    @Override

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-08-01 12:49:46 UTC (rev 11083)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-08-01 12:50:41 UTC (rev 11084)
@@ -2046,8 +2046,8 @@
       return backupUpToDate;
    }
 
-   public void setRemoteBackupUpToDate(boolean isUpToDate)
+   public void setRemoteBackupUpToDate()
    {
-      backupUpToDate = isUpToDate;
+      backupUpToDate = true;
    }
 }



More information about the hornetq-commits mailing list