Author: borges
Date: 2011-08-01 06:31:11 -0400 (Mon, 01 Aug 2011)
New Revision: 11081
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
Log:
HORNETQ-720 Use a copy of sequential file, and a ByteBuffer to transfer
bytes
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-08-01
10:30:12 UTC (rev 11080)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-08-01
10:31:11 UTC (rev 11081)
@@ -393,7 +393,7 @@
* @throws IOException
* @throws HornetQException
*/
- private void sendJournalFile(JournalFile[] journalFiles, JournalContent type) throws
IOException, HornetQException
+ private void sendJournalFile(JournalFile[] journalFiles, JournalContent type) throws
Exception
{
for (JournalFile jf : journalFiles)
{
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-08-01
10:30:12 UTC (rev 11080)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/CoreProtocolManager.java 2011-08-01
10:31:11 UTC (rev 11081)
@@ -155,7 +155,7 @@
}
catch (Exception e)
{
- // XXX This is not what we want
+ // XXX HORNETQ-720 This is not what we want
e.printStackTrace();
throw new RuntimeException(e);
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java 2011-08-01
10:30:12 UTC (rev 11080)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java 2011-08-01
10:31:11 UTC (rev 11081)
@@ -1,5 +1,7 @@
package org.hornetq.core.protocol.core.impl.wireformat;
+import java.nio.ByteBuffer;
+
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
import org.hornetq.core.protocol.core.impl.PacketImpl;
@@ -14,24 +16,25 @@
public final class ReplicationJournalFileMessage extends PacketImpl
{
- private byte[] data;
+ private ByteBuffer data;
private int dataSize;
private JournalContent journalType;
/** This value refers to {@link org.hornetq.core.journal.impl.JournalFile#getFileID()}
*/
private long fileId;
private boolean backupIsUpToDate = false;
+ private byte[] byteArray;
public ReplicationJournalFileMessage()
{
super(REPLICATION_SYNC);
}
- public ReplicationJournalFileMessage(int size, byte[] data, JournalContent content,
long id)
+ public ReplicationJournalFileMessage(int size, ByteBuffer buffer, JournalContent
content, long id)
{
this();
this.fileId = id;
this.dataSize = size;
- this.data = data;
+ this.data = buffer;
this.journalType = content;
}
@@ -46,7 +49,7 @@
// sending -1 will close the file
if (dataSize > -1)
{
- buffer.writeBytes(data, 0, dataSize);
+ buffer.writeBytes(data);// (data, 0, dataSize);
}
}
@@ -63,8 +66,8 @@
int size = buffer.readInt();
if (size > -1)
{
- data = new byte[size];
- buffer.readBytes(data);
+ byteArray = new byte[size];
+ buffer.readBytes(byteArray);
}
}
@@ -75,7 +78,7 @@
public byte[] getData()
{
- return data;
+ return byteArray;
}
public JournalContent getJournalContent()
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-08-01
10:30:12 UTC (rev 11080)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/ReplicationManager.java 2011-08-01
10:31:11 UTC (rev 11081)
@@ -13,7 +13,6 @@
package org.hornetq.core.replication;
-import java.io.IOException;
import java.util.Set;
import org.hornetq.api.core.HornetQException;
@@ -90,8 +89,9 @@
/**
* Sends the whole content of the file to be duplicated.
* @throws HornetQException
+ * @throws Exception
*/
- void sendJournalFile(JournalFile jf, JournalContent type) throws IOException,
HornetQException;
+ void sendJournalFile(JournalFile jf, JournalContent type) throws Exception;
/**
* Reserve the following fileIDs in the backup server.
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-08-01
10:30:12 UTC (rev 11080)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-08-01
10:31:11 UTC (rev 11081)
@@ -13,8 +13,7 @@
package org.hornetq.core.replication.impl;
-import java.io.FileInputStream;
-import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.LinkedHashSet;
import java.util.Queue;
import java.util.Set;
@@ -26,6 +25,7 @@
import org.hornetq.api.core.client.SessionFailureListener;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.JournalLoadInformation;
+import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.impl.JournalFile;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PagedMessage;
@@ -505,16 +505,23 @@
}
@Override
- public void sendJournalFile(JournalFile jf, JournalContent content) throws
IOException, HornetQException
+ public void sendJournalFile(JournalFile jf, JournalContent content) throws Exception
{
- FileInputStream file = new FileInputStream(jf.getFile().getFileName());
+ SequentialFile file = jf.getFile().copy();
+ if (!file.isOpen())
+ {
+ file.open(1, false);
+ }
final long id = jf.getFileID();
- final byte[] data = new byte[1 << 17]; // about 130 kB
+ final ByteBuffer buffer = ByteBuffer.allocate(1 << 17);
+
while (true)
{
- int bytesRead = file.read(data);
- // sending -1 bytes will close the file.
- replicatingChannel.send(new ReplicationJournalFileMessage(bytesRead, data,
content, id));
+ int bytesRead = file.read(buffer);
+ if (bytesRead > -1)
+ buffer.limit(bytesRead);
+ // sending -1 bytes will close the file at the backup
+ replicatingChannel.send(new ReplicationJournalFileMessage(bytesRead, buffer,
content, id));
if (bytesRead == -1)
break;
}