[hornetq-commits] JBoss hornetq SVN: r11081 - in branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core: protocol/core/impl and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Aug 1 06:31:11 EDT 2011


Author: borges
Date: 2011-08-01 06:31:11 -0400 (Mon, 01 Aug 2011)
New Revision: 11081

Modified:
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
Log:
HORNETQ-720 Use a copy of sequential file, and a ByteBuffer to transfer
bytes

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-08-01 10:30:12 UTC (rev 11080)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-08-01 10:31:11 UTC (rev 11081)
@@ -393,7 +393,7 @@
     * @throws IOException
     * @throws HornetQException
     */
-   private void sendJournalFile(JournalFile[] journalFiles, JournalContent type) throws IOException, HornetQException
+   private void sendJournalFile(JournalFile[] journalFiles, JournalContent type) throws Exception
    {
       for (JournalFile jf : journalFiles)
       {

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2011-08-01 10:30:12 UTC (rev 11080)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java	2011-08-01 10:31:11 UTC (rev 11081)
@@ -155,7 +155,7 @@
                }
                catch (Exception e)
                {
-                  // XXX This is not what we want
+                  // XXX HORNETQ-720 This is not what we want
                   e.printStackTrace();
                   throw new RuntimeException(e);
                }

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java	2011-08-01 10:30:12 UTC (rev 11080)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java	2011-08-01 10:31:11 UTC (rev 11081)
@@ -1,5 +1,7 @@
 package org.hornetq.core.protocol.core.impl.wireformat;
 
+import java.nio.ByteBuffer;
+
 import org.hornetq.api.core.HornetQBuffer;
 import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
 import org.hornetq.core.protocol.core.impl.PacketImpl;
@@ -14,24 +16,25 @@
 public final class ReplicationJournalFileMessage extends PacketImpl
 {
 
-   private byte[] data;
+   private ByteBuffer data;
    private int dataSize;
    private JournalContent journalType;
    /** This value refers to {@link org.hornetq.core.journal.impl.JournalFile#getFileID()} */
    private long fileId;
    private boolean backupIsUpToDate = false;
+   private byte[] byteArray;
 
    public ReplicationJournalFileMessage()
    {
       super(REPLICATION_SYNC);
    }
 
-   public ReplicationJournalFileMessage(int size, byte[] data, JournalContent content, long id)
+   public ReplicationJournalFileMessage(int size, ByteBuffer buffer, JournalContent content, long id)
    {
       this();
       this.fileId = id;
       this.dataSize = size;
-      this.data = data;
+      this.data = buffer;
       this.journalType = content;
    }
 
@@ -46,7 +49,7 @@
       // sending -1 will close the file
       if (dataSize > -1)
       {
-         buffer.writeBytes(data, 0, dataSize);
+         buffer.writeBytes(data);// (data, 0, dataSize);
       }
    }
 
@@ -63,8 +66,8 @@
       int size = buffer.readInt();
       if (size > -1)
       {
-         data = new byte[size];
-         buffer.readBytes(data);
+         byteArray = new byte[size];
+         buffer.readBytes(byteArray);
       }
    }
 
@@ -75,7 +78,7 @@
 
    public byte[] getData()
    {
-      return data;
+      return byteArray;
    }
 
    public JournalContent getJournalContent()

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java	2011-08-01 10:30:12 UTC (rev 11080)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java	2011-08-01 10:31:11 UTC (rev 11081)
@@ -13,7 +13,6 @@
 
 package org.hornetq.core.replication;
 
-import java.io.IOException;
 import java.util.Set;
 
 import org.hornetq.api.core.HornetQException;
@@ -90,8 +89,9 @@
    /**
     * Sends the whole content of the file to be duplicated.
     * @throws HornetQException
+    * @throws Exception
     */
-   void sendJournalFile(JournalFile jf, JournalContent type) throws IOException, HornetQException;
+   void sendJournalFile(JournalFile jf, JournalContent type) throws Exception;
 
    /**
     * Reserve the following fileIDs in the backup server.

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2011-08-01 10:30:12 UTC (rev 11080)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java	2011-08-01 10:31:11 UTC (rev 11081)
@@ -13,8 +13,7 @@
 
 package org.hornetq.core.replication.impl;
 
-import java.io.FileInputStream;
-import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.LinkedHashSet;
 import java.util.Queue;
 import java.util.Set;
@@ -26,6 +25,7 @@
 import org.hornetq.api.core.client.SessionFailureListener;
 import org.hornetq.core.journal.EncodingSupport;
 import org.hornetq.core.journal.JournalLoadInformation;
+import org.hornetq.core.journal.SequentialFile;
 import org.hornetq.core.journal.impl.JournalFile;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.paging.PagedMessage;
@@ -505,16 +505,23 @@
    }
 
    @Override
-   public void sendJournalFile(JournalFile jf, JournalContent content) throws IOException, HornetQException
+   public void sendJournalFile(JournalFile jf, JournalContent content) throws Exception
    {
-      FileInputStream file = new FileInputStream(jf.getFile().getFileName());
+      SequentialFile file = jf.getFile().copy();
+      if (!file.isOpen())
+      {
+         file.open(1, false);
+      }
       final long id = jf.getFileID();
-      final byte[] data = new byte[1 << 17]; // about 130 kB
+      final ByteBuffer buffer = ByteBuffer.allocate(1 << 17);
+
       while (true)
       {
-         int bytesRead = file.read(data);
-         // sending -1 bytes will close the file.
-         replicatingChannel.send(new ReplicationJournalFileMessage(bytesRead, data, content, id));
+         int bytesRead = file.read(buffer);
+         if (bytesRead > -1)
+            buffer.limit(bytesRead);
+         // sending -1 bytes will close the file at the backup
+         replicatingChannel.send(new ReplicationJournalFileMessage(bytesRead, buffer, content, id));
          if (bytesRead == -1)
             break;
       }



More information about the hornetq-commits mailing list