Author: borges
Date: 2011-10-10 09:35:11 -0400 (Mon, 10 Oct 2011)
New Revision: 11501
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
Log:
HORNETQ-720 Make ReplicationEndpoint.stop() more robust, and do close all open files
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-10-10
13:35:00 UTC (rev 11500)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-10-10
13:35:11 UTC (rev 11501)
@@ -18,6 +18,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import java.util.Arrays;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Map;
@@ -91,7 +92,7 @@
/** Files reserved in each journal for synchronization of existing data from the
'live' server. */
private final Map<JournalContent, Map<Long, JournalSyncFile>>
filesReservedForSync =
new HashMap<JournalContent, Map<Long, JournalSyncFile>>();
- private Map<Long, LargeServerMessage> largeMessagesOnSync = new HashMap<Long,
LargeServerMessage>();
+ private final Map<Long, LargeServerMessage> largeMessagesOnSync = new
HashMap<Long, LargeServerMessage>();
/**
* Used to hold the real Journals before the backup is synchronized. This field should
be
@@ -144,6 +145,7 @@
*/
public void handlePacket(final Packet packet)
{
+
PacketImpl response = new ReplicationResponseMessage();
final byte type=packet.getType();
@@ -230,7 +232,7 @@
return started;
}
- public void start() throws Exception
+ public synchronized void start() throws Exception
{
Configuration config = server.getConfiguration();
@@ -263,22 +265,19 @@
}
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQComponent#stop()
- */
- public void stop() throws Exception
+ public synchronized void stop() throws Exception
{
- if(!started)
+ if (!started)
{
return;
}
+
// This could be null if the backup server is being
// shut down without any live server connecting here
if (channel != null)
{
channel.close();
}
- storage.stop();
for (ConcurrentMap<Integer, Page> map : pageIndex.values())
{
@@ -301,11 +300,28 @@
{
largeMessage.releaseResources();
}
-
largeMessages.clear();
+ for (LargeServerMessage largeMessage : largeMessagesOnSync.values())
+ {
+ largeMessage.releaseResources();
+ }
+ largeMessagesOnSync.clear();
+
+ for (Entry<JournalContent, Map<Long, JournalSyncFile>> entry :
filesReservedForSync.entrySet())
+ {
+ for (JournalSyncFile filesReserved : entry.getValue().values())
+ {
+ filesReserved.close();
+ }
+ }
+
+ filesReservedForSync.clear();
pageManager.stop();
+ // Storage needs to be the last to stop
+ storage.stop();
+
started = false;
}
@@ -437,9 +453,9 @@
}
largeMessages.clear();
largeMessages.putAll(largeMessagesOnSync);
+ largeMessagesOnSync.clear();
}
}
- largeMessagesOnSync = null;
journalsHolder = null;
server.setRemoteBackupUpToDate();
log.info("Backup server " + server + " is synchronized with
live-server.");
@@ -524,15 +540,23 @@
return;
}
+
final Journal journal = journalsHolder.get(packet.getJournalContentType());
+ synchronized (this)
+ {
+ if (!started)
+ return;
+ Map<Long, JournalSyncFile> mapToFill =
filesReservedForSync.get(packet.getJournalContentType());
+ log.info("Journal " + packet.getJournalContentType() + ".
Reserving fileIDs for synchronization: " +
+ Arrays.toString(packet.getFileIds()));
- Map<Long, JournalSyncFile> mapToFill =
filesReservedForSync.get(packet.getJournalContentType());
for (Entry<Long, JournalFile> entry :
journal.createFilesForBackupSync(packet.getFileIds()).entrySet())
{
mapToFill.put(entry.getKey(), new JournalSyncFile(entry.getValue()));
}
registerJournal(packet.getJournalContentType().typeByte, new
FileWrapperJournal(journal));
- }
+ }
+ }
private void handleLargeMessageEnd(final ReplicationLargeMessageEndMessage packet)
{
@@ -601,9 +625,7 @@
log.warn("Large MessageID " + messageId +
" is not available on backup server.
Ignoring replication message");
}
-
return message;
-
}
/**
@@ -845,7 +867,8 @@
void close() throws IOException
{
- channel.close();
+ if (channel != null)
+ channel.close();
}
@Override