Author: borges
Date: 2011-08-01 08:50:41 -0400 (Mon, 01 Aug 2011)
New Revision: 11084
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
HORNETQ-720 Fix handling of read(buffer) returning 0.
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-08-01
12:49:46 UTC (rev 11083)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-08-01
12:50:41 UTC (rev 11084)
@@ -366,15 +366,21 @@
final JournalImpl localMessageJournal = (JournalImpl)messageJournal;
final JournalImpl localBindingsJournal = (JournalImpl)bindingsJournal;
+ JournalFile[] messageFiles;
+ JournalFile[] bindingsFiles;
+
localMessageJournal.writeLock();
localBindingsJournal.writeLock();
-
- JournalFile[] messageFiles = prepareJournalForCopy(localMessageJournal,
JournalContent.MESSAGES);
- JournalFile[] bindingsFiles = prepareJournalForCopy(localBindingsJournal,
JournalContent.BINDINGS);
-
- localMessageJournal.writeUnlock();
- localBindingsJournal.writeUnlock();
-
+ try
+ {
+ messageFiles = prepareJournalForCopy(localMessageJournal,
JournalContent.MESSAGES);
+ bindingsFiles = prepareJournalForCopy(localBindingsJournal,
JournalContent.BINDINGS);
+ }
+ finally
+ {
+ localMessageJournal.writeUnlock();
+ localBindingsJournal.writeUnlock();
+ }
bindingsJournal = new ReplicatedJournal(((byte)0), localBindingsJournal,
replicator);
messageJournal = new ReplicatedJournal((byte)1, localMessageJournal, replicator);
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java 2011-08-01
12:49:46 UTC (rev 11083)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/protocol/core/impl/wireformat/ReplicationJournalFileMessage.java 2011-08-01
12:50:41 UTC (rev 11084)
@@ -47,7 +47,7 @@
buffer.writeByte(journalType.typeByte);
buffer.writeInt(dataSize);
// sending -1 will close the file
- if (dataSize > -1)
+ if (dataSize > 0)
{
buffer.writeBytes(data);// (data, 0, dataSize);
}
@@ -64,7 +64,7 @@
}
journalType = JournalContent.getType(buffer.readByte());
int size = buffer.readInt();
- if (size > -1)
+ if (size > 0)
{
byteArray = new byte[size];
buffer.readBytes(byteArray);
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-08-01
12:49:46 UTC (rev 11083)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-08-01
12:50:41 UTC (rev 11084)
@@ -377,7 +377,7 @@
JournalImpl journal = (JournalImpl)j;
journal.finishRemoteBackupSync();
}
- server.setRemoteBackupUpToDate(true);
+ server.setRemoteBackupUpToDate();
return;
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-08-01
12:49:46 UTC (rev 11083)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2011-08-01
12:50:41 UTC (rev 11084)
@@ -508,25 +508,25 @@
public void sendJournalFile(JournalFile jf, JournalContent content) throws Exception
{
SequentialFile file = jf.getFile().copy();
+ log.info("Replication: sending " + jf + " to backup. " +
file);
if (!file.isOpen())
{
file.open(1, false);
}
final long id = jf.getFileID();
final ByteBuffer buffer = ByteBuffer.allocate(1 << 17);
-
while (true)
{
int bytesRead = file.read(buffer);
- if (bytesRead > -1)
+ if (bytesRead > 0)
buffer.limit(bytesRead);
- // sending -1 bytes will close the file at the backup
+ buffer.rewind();
+
+ // sending -1 or 0 bytes will close the file at the backup
replicatingChannel.send(new ReplicationJournalFileMessage(bytesRead, buffer,
content, id));
- if (bytesRead == -1)
+ if (bytesRead == -1 || bytesRead == 0)
break;
}
- // XXX probably need to sync the JournalFile(?)
- throw new UnsupportedOperationException();
}
@Override
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-08-01
12:49:46 UTC (rev 11083)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-08-01
12:50:41 UTC (rev 11084)
@@ -2046,8 +2046,8 @@
return backupUpToDate;
}
- public void setRemoteBackupUpToDate(boolean isUpToDate)
+ public void setRemoteBackupUpToDate()
{
- backupUpToDate = isUpToDate;
+ backupUpToDate = true;
}
}
Show replies by date