Author: timfox
Date: 2010-09-21 05:52:36 -0400 (Tue, 21 Sep 2010)
New Revision: 9706
Modified:
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
Log:
https://jira.jboss.org/browse/HORNETQ-506
Modified:
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-09-21
09:31:13 UTC (rev 9705)
+++
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-09-21
09:52:36 UTC (rev 9706)
@@ -168,11 +168,10 @@
private final String journalDir;
private final String largeMessagesDirectory;
-
-
+
// Persisted core configuration
private final Map<SimpleString, PersistedRoles> mapPersistedRoles = new
ConcurrentHashMap<SimpleString, PersistedRoles>();
-
+
private final Map<SimpleString, PersistedAddressSetting>
mapPersistedAddressSettings = new ConcurrentHashMap<SimpleString,
PersistedAddressSetting>();
public JournalStorageManager(final Configuration config, final ExecutorFactory
executorFactory)
@@ -585,7 +584,11 @@
public void updatePageTransaction(final long txID, final PageTransactionInfo
pageTransaction, final int depages) throws Exception
{
- messageJournal.appendUpdateRecordTransactional(txID, pageTransaction.getRecordID(),
JournalStorageManager.PAGE_TRANSACTION, new
PageUpdateTXEncoding(pageTransaction.getTransactionID(), depages));
+ messageJournal.appendUpdateRecordTransactional(txID,
+ pageTransaction.getRecordID(),
+
JournalStorageManager.PAGE_TRANSACTION,
+ new
PageUpdateTXEncoding(pageTransaction.getTransactionID(),
+ depages));
}
public void storeReferenceTransactional(final long txID, final long queueID, final
long messageID) throws Exception
@@ -697,8 +700,7 @@
getContext(syncNonTransactional));
}
-
-
+
public void storeAddressSetting(PersistedAddressSetting addressSetting) throws
Exception
{
deleteAddressSetting(addressSetting.getAddressMatch());
@@ -707,14 +709,13 @@
bindingsJournal.appendAddRecord(id, ADDRESS_SETTING_RECORD, addressSetting, true);
mapPersistedAddressSettings.put(addressSetting.getAddressMatch(), addressSetting);
}
-
+
public List<PersistedAddressSetting> recoverAddressSettings() throws Exception
{
ArrayList<PersistedAddressSetting> list = new
ArrayList<PersistedAddressSetting>(mapPersistedAddressSettings.size());
list.addAll(mapPersistedAddressSettings.values());
return list;
}
-
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#recoverPersistedRoles()
@@ -746,9 +747,9 @@
{
bindingsJournal.appendDeleteRecord(oldSetting.getStoreId(), false);
}
-
+
}
-
+
public void deleteSecurityRoles(SimpleString addressMatch) throws Exception
{
PersistedRoles oldRoles = mapPersistedRoles.remove(addressMatch);
@@ -758,8 +759,6 @@
}
}
-
-
public JournalLoadInformation loadMessageJournal(final PostOffice postOffice,
final PagingManager pagingManager,
final ResourceManager
resourceManager,
@@ -771,7 +770,7 @@
List<PreparedTransactionInfo> preparedTransactions = new
ArrayList<PreparedTransactionInfo>();
Map<Long, ServerMessage> messages = new HashMap<Long,
ServerMessage>();
-
+
JournalLoadInformation info = messageJournal.load(records,
preparedTransactions,
new
LargeMessageTXFailureCallback(messages));
@@ -781,17 +780,17 @@
Map<Long, Map<Long, AddMessageRecord>> queueMap = new HashMap<Long,
Map<Long, AddMessageRecord>>();
final int totalSize = records.size();
-
- for (int reccount = 0 ; reccount < totalSize; reccount++)
+
+ for (int reccount = 0; reccount < totalSize; reccount++)
{
// It will show log.info only with large journals (more than 1 million records)
- if (reccount> 0 && reccount % 1000000 == 0)
+ if (reccount > 0 && reccount % 1000000 == 0)
{
long percent = (long)((((double)reccount) / ((double)totalSize)) * 100f);
-
+
log.info(percent + "% loaded");
}
-
+
RecordInfo record = records.get(reccount);
byte[] data = record.data;
@@ -885,15 +884,15 @@
if (queueMessages == null)
{
- log.warn("Cannot find queue " + encoding.queueID + " to
update delivery count");
+ log.warn("Cannot find queue " + encoding.queueID + " to
update delivery count");
}
else
{
AddMessageRecord rec = queueMessages.get(messageID);
-
+
if (rec == null)
{
- log.warn("Cannot find message " + messageID + " to
update delivery count");
+ log.warn("Cannot find message " + messageID + " to
update delivery count");
}
else
{
@@ -908,21 +907,21 @@
if (record.isUpdate)
{
PageUpdateTXEncoding pageUpdate = new PageUpdateTXEncoding();
-
+
pageUpdate.decode(buff);
-
+
PageTransactionInfo pageTX =
pagingManager.getTransaction(pageUpdate.pageTX);
-
+
pageTX.update(pageUpdate.recods, null, null);
}
else
{
PageTransactionInfoImpl pageTransactionInfo = new
PageTransactionInfoImpl();
-
+
pageTransactionInfo.decode(buff);
-
+
pageTransactionInfo.setRecordID(record.id);
-
+
pagingManager.addTransaction(pageTransactionInfo);
}
@@ -985,13 +984,13 @@
throw new IllegalStateException("Invalid record type " +
recordType);
}
}
-
+
// This will free up memory sooner. The record is not needed any more
// and its byte array would consume memory during the load process even though
it's not necessary any longer
// what would delay processing time during load
records.set(reccount, null);
}
-
+
// Release the memory as soon as not needed any longer
records.clear();
records = null;
@@ -1003,7 +1002,14 @@
Map<Long, AddMessageRecord> queueRecords = entry.getValue();
Queue queue = queues.get(queueID);
-
+
+ if (queue == null)
+ {
+ log.warn("Message for queue " + queueID + " which does not
exist. This message will be ignored.");
+
+ continue;
+ }
+
Collection<AddMessageRecord> valueRecords = queueRecords.values();
for (AddMessageRecord record : valueRecords)
@@ -1037,7 +1043,7 @@
msg.decrementDelayDeletionCount();
}
}
-
+
for (ServerMessage msg : messages.values())
{
if (msg.getRefCount() == 0)
@@ -1053,7 +1059,7 @@
}
}
}
-
+
if (perfBlastPages != -1)
{
messageJournal.perfBlast(perfBlastPages);
@@ -1418,18 +1424,22 @@
if (queue == null)
{
- throw new IllegalStateException("Cannot find queue with id
" + encoding.queueID);
+ log.warn("Message in prepared tx for queue " +
encoding.queueID +
+ " which does not exist. This message will be
ignored.");
+
}
+ else
+ {
+ ServerMessage message = messages.get(messageID);
- ServerMessage message = messages.get(messageID);
+ if (message == null)
+ {
+ throw new IllegalStateException("Cannot find message with id
" + messageID);
+ }
- if (message == null)
- {
- throw new IllegalStateException("Cannot find message with id
" + messageID);
+ postOffice.reroute(message, queue, tx);
}
- postOffice.reroute(message, queue, tx);
-
break;
}
case ACKNOWLEDGE_REF:
@@ -1446,7 +1456,7 @@
{
throw new IllegalStateException("Cannot find queue with id
" + encoding.queueID);
}
-
+
// TODO - this involves a scan - we should find a quicker way of doing
it
MessageReference removed = queue.removeReferenceWithID(messageID);
@@ -2017,24 +2027,24 @@
super(queueID);
}
}
-
+
private static class PageUpdateTXEncoding implements EncodingSupport
{
-
+
public long pageTX;
-
+
public int recods;
-
+
public PageUpdateTXEncoding()
{
}
-
+
public PageUpdateTXEncoding(final long pageTX, final int records)
{
this.pageTX = pageTX;
this.recods = records;
}
-
+
public void decode(HornetQBuffer buffer)
{
this.pageTX = buffer.readLong();
@@ -2057,7 +2067,7 @@
{
return DataConstants.SIZE_LONG + DataConstants.SIZE_INT;
}
-
+
}
private static class ScheduledDeliveryEncoding extends QueueEncoding
@@ -2182,6 +2192,7 @@
}
}
+
private static final class AddMessageRecord
{
public AddMessageRecord(final ServerMessage message)
@@ -2194,7 +2205,7 @@
long scheduledDeliveryTime;
int deliveryCount;
-
+
boolean referenced = false;
}