[hornetq-commits] JBoss hornetq SVN: r12240 - branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal.
do-not-reply at jboss.org
do-not-reply at jboss.org
Mon Mar 5 11:39:47 EST 2012
Author: jbertram
Date: 2012-03-05 11:39:46 -0500 (Mon, 05 Mar 2012)
New Revision: 12240
Modified:
branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
Log:
[HORNETQ-787] restored formatting
Modified: branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2012-03-05 16:27:05 UTC (rev 12239)
+++ branches/Branch_2_2_EAP_HORNETQ-787/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2012-03-05 16:39:46 UTC (rev 12240)
@@ -102,9 +102,9 @@
import org.hornetq.utils.XidCodecSupport;
/**
- *
+ *
* A JournalStorageManager
- *
+ *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
* @author <a href="jmesnil at redhat.com">Jeff Mesnil</a>
@@ -158,7 +158,7 @@
public static final byte PAGE_CURSOR_COUNTER_VALUE = 40;
public static final byte PAGE_CURSOR_COUNTER_INC = 41;
-
+
private final Semaphore pageMaxConcurrentIO;
private final BatchingIDGenerator idGenerator;
@@ -170,7 +170,7 @@
private final Journal bindingsJournal;
private final SequentialFileFactory largeMessagesFactory;
-
+
private SequentialFileFactory journalFF = null;
private volatile boolean started;
@@ -247,13 +247,13 @@
SequentialFileFactory bindingsFF = new NIOSequentialFileFactory(bindingsDir, criticalErrorListener);
Journal localBindings = new JournalImpl(1024 * 1024,
- 2,
- config.getJournalCompactMinFiles(),
- config.getJournalCompactPercentage(),
- bindingsFF,
- "hornetq-bindings",
- "bindings",
- 1);
+ 2,
+ config.getJournalCompactMinFiles(),
+ config.getJournalCompactPercentage(),
+ bindingsFF,
+ "hornetq-bindings",
+ "bindings",
+ 1);
if (replicator != null)
{
@@ -280,20 +280,20 @@
JournalStorageManager.log.info("Using AIO Journal");
journalFF = new AIOSequentialFileFactory(journalDir,
- config.getJournalBufferSize_AIO(),
- config.getJournalBufferTimeout_AIO(),
- config.isLogJournalWriteRate(),
- criticalErrorListener);
+ config.getJournalBufferSize_AIO(),
+ config.getJournalBufferTimeout_AIO(),
+ config.isLogJournalWriteRate(),
+ criticalErrorListener);
}
else if (config.getJournalType() == JournalType.NIO)
{
JournalStorageManager.log.info("Using NIO Journal");
journalFF = new NIOSequentialFileFactory(journalDir,
- true,
- config.getJournalBufferSize_NIO(),
- config.getJournalBufferTimeout_NIO(),
- config.isLogJournalWriteRate(),
- criticalErrorListener);
+ true,
+ config.getJournalBufferSize_NIO(),
+ config.getJournalBufferTimeout_NIO(),
+ config.isLogJournalWriteRate(),
+ criticalErrorListener);
}
else
{
@@ -309,14 +309,14 @@
idGenerator = new BatchingIDGenerator(0, JournalStorageManager.CHECKPOINT_BATCH_SIZE, bindingsJournal);
}
Journal localMessage = new JournalImpl(config.getJournalFileSize(),
- config.getJournalMinFiles(),
- config.getJournalCompactMinFiles(),
- config.getJournalCompactPercentage(),
- journalFF,
- "hornetq-data",
- "hq",
- config.getJournalType() == JournalType.ASYNCIO ? config.getJournalMaxIO_AIO()
- : config.getJournalMaxIO_NIO());
+ config.getJournalMinFiles(),
+ config.getJournalCompactMinFiles(),
+ config.getJournalCompactPercentage(),
+ journalFF,
+ "hornetq-data",
+ "hq",
+ config.getJournalType() == JournalType.ASYNCIO ? config.getJournalMaxIO_AIO()
+ : config.getJournalMaxIO_NIO());
if (replicator != null)
{
@@ -332,7 +332,7 @@
largeMessagesFactory = new NIOSequentialFileFactory(largeMessagesDirectory, false, criticalErrorListener);
perfBlastPages = config.getJournalPerfBlastPages();
-
+
if (config.getPageMaxConcurrentIO() != 1)
{
pageMaxConcurrentIO = new Semaphore(config.getPageMaxConcurrentIO());
@@ -507,10 +507,10 @@
long recordID = generateUniqueID();
messageJournal.appendAddRecord(recordID,
- ADD_LARGE_MESSAGE_PENDING,
- new PendingLargeMessageEncoding(messageID),
- true,
- getContext(true));
+ ADD_LARGE_MESSAGE_PENDING,
+ new PendingLargeMessageEncoding(messageID),
+ true,
+ getContext(true));
return recordID;
}
@@ -519,8 +519,8 @@
{
installLargeMessageConfirmationOnTX(tx, recordID);
messageJournal.appendDeleteRecordTransactional(tx.getID(),
- recordID,
- new DeleteEncoding(ADD_LARGE_MESSAGE_PENDING, messageID));
+ recordID,
+ new DeleteEncoding(ADD_LARGE_MESSAGE_PENDING, messageID));
}
/** We don't need messageID now but we are likely to need it we ever decide to support a database */
@@ -542,18 +542,18 @@
if (message.isLargeMessage())
{
messageJournal.appendAddRecord(message.getMessageID(),
- JournalStorageManager.ADD_LARGE_MESSAGE,
- new LargeMessageEncoding((LargeServerMessage)message),
- false,
- getContext(false));
+ JournalStorageManager.ADD_LARGE_MESSAGE,
+ new LargeMessageEncoding((LargeServerMessage)message),
+ false,
+ getContext(false));
}
else
{
messageJournal.appendAddRecord(message.getMessageID(),
- JournalStorageManager.ADD_MESSAGE,
- message,
- false,
- getContext(false));
+ JournalStorageManager.ADD_MESSAGE,
+ message,
+ false,
+ getContext(false));
}
}
@@ -561,19 +561,19 @@
{
messageJournal.appendUpdateRecord(messageID,
- JournalStorageManager.ADD_REF,
- new RefEncoding(queueID),
- last && syncNonTransactional,
- getContext(last && syncNonTransactional));
+ JournalStorageManager.ADD_REF,
+ new RefEncoding(queueID),
+ last && syncNonTransactional,
+ getContext(last && syncNonTransactional));
}
public void storeAcknowledge(final long queueID, final long messageID) throws Exception
{
messageJournal.appendUpdateRecord(messageID,
- JournalStorageManager.ACKNOWLEDGE_REF,
- new RefEncoding(queueID),
- syncNonTransactional,
- getContext(syncNonTransactional));
+ JournalStorageManager.ACKNOWLEDGE_REF,
+ new RefEncoding(queueID),
+ syncNonTransactional,
+ getContext(syncNonTransactional));
}
public void storeCursorAcknowledge(long queueID, PagePosition position) throws Exception
@@ -581,10 +581,10 @@
long ackID = idGenerator.generateID();
position.setRecordID(ackID);
messageJournal.appendAddRecord(ackID,
- ACKNOWLEDGE_CURSOR,
- new CursorAckRecordEncoding(queueID, position),
- syncNonTransactional,
- getContext(syncNonTransactional));
+ ACKNOWLEDGE_CURSOR,
+ new CursorAckRecordEncoding(queueID, position),
+ syncNonTransactional,
+ getContext(syncNonTransactional));
}
public void deleteMessage(final long messageID) throws Exception
@@ -599,13 +599,13 @@
public void updateScheduledDeliveryTime(final MessageReference ref) throws Exception
{
ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(), ref.getQueue()
- .getID());
+ .getID());
messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
- JournalStorageManager.SET_SCHEDULED_DELIVERY_TIME,
- encoding,
- syncNonTransactional,
- getContext(syncNonTransactional));
+ JournalStorageManager.SET_SCHEDULED_DELIVERY_TIME,
+ encoding,
+ syncNonTransactional,
+ getContext(syncNonTransactional));
}
public void storeDuplicateID(final SimpleString address, final byte[] duplID, final long recordID) throws Exception
@@ -613,10 +613,10 @@
DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID);
messageJournal.appendAddRecord(recordID,
- JournalStorageManager.DUPLICATE_ID,
- encoding,
- syncNonTransactional,
- getContext(syncNonTransactional));
+ JournalStorageManager.DUPLICATE_ID,
+ encoding,
+ syncNonTransactional,
+ getContext(syncNonTransactional));
}
public void deleteDuplicateID(final long recordID) throws Exception
@@ -636,16 +636,16 @@
if (message.isLargeMessage())
{
messageJournal.appendAddRecordTransactional(txID,
- message.getMessageID(),
- JournalStorageManager.ADD_LARGE_MESSAGE,
- new LargeMessageEncoding(((LargeServerMessage)message)));
+ message.getMessageID(),
+ JournalStorageManager.ADD_LARGE_MESSAGE,
+ new LargeMessageEncoding(((LargeServerMessage)message)));
}
else
{
messageJournal.appendAddRecordTransactional(txID,
- message.getMessageID(),
- JournalStorageManager.ADD_MESSAGE,
- message);
+ message.getMessageID(),
+ JournalStorageManager.ADD_MESSAGE,
+ message);
}
}
@@ -655,43 +655,43 @@
pageTransaction.setRecordID(generateUniqueID());
messageJournal.appendAddRecordTransactional(txID,
- pageTransaction.getRecordID(),
- JournalStorageManager.PAGE_TRANSACTION,
- pageTransaction);
+ pageTransaction.getRecordID(),
+ JournalStorageManager.PAGE_TRANSACTION,
+ pageTransaction);
}
public void updatePageTransaction(final long txID, final PageTransactionInfo pageTransaction, final int depages) throws Exception
{
messageJournal.appendUpdateRecordTransactional(txID,
- pageTransaction.getRecordID(),
- JournalStorageManager.PAGE_TRANSACTION,
- new PageUpdateTXEncoding(pageTransaction.getTransactionID(),
- depages));
+ pageTransaction.getRecordID(),
+ JournalStorageManager.PAGE_TRANSACTION,
+ new PageUpdateTXEncoding(pageTransaction.getTransactionID(),
+ depages));
}
public void updatePageTransaction(final PageTransactionInfo pageTransaction, final int depages) throws Exception
{
messageJournal.appendUpdateRecord(pageTransaction.getRecordID(),
- JournalStorageManager.PAGE_TRANSACTION,
- new PageUpdateTXEncoding(pageTransaction.getTransactionID(), depages),
- syncNonTransactional,
- getContext(syncNonTransactional));
+ JournalStorageManager.PAGE_TRANSACTION,
+ new PageUpdateTXEncoding(pageTransaction.getTransactionID(), depages),
+ syncNonTransactional,
+ getContext(syncNonTransactional));
}
public void storeReferenceTransactional(final long txID, final long queueID, final long messageID) throws Exception
{
messageJournal.appendUpdateRecordTransactional(txID,
- messageID,
- JournalStorageManager.ADD_REF,
- new RefEncoding(queueID));
+ messageID,
+ JournalStorageManager.ADD_REF,
+ new RefEncoding(queueID));
}
public void storeAcknowledgeTransactional(final long txID, final long queueID, final long messageID) throws Exception
{
messageJournal.appendUpdateRecordTransactional(txID,
- messageID,
- JournalStorageManager.ACKNOWLEDGE_REF,
- new RefEncoding(queueID));
+ messageID,
+ JournalStorageManager.ACKNOWLEDGE_REF,
+ new RefEncoding(queueID));
}
/* (non-Javadoc)
@@ -702,9 +702,9 @@
long ackID = idGenerator.generateID();
position.setRecordID(ackID);
messageJournal.appendAddRecordTransactional(txID,
- ackID,
- ACKNOWLEDGE_CURSOR,
- new CursorAckRecordEncoding(queueID, position));
+ ackID,
+ ACKNOWLEDGE_CURSOR,
+ new CursorAckRecordEncoding(queueID, position));
}
/* (non-Javadoc)
@@ -720,10 +720,10 @@
long id = generateUniqueID();
messageJournal.appendAddRecord(id,
- JournalStorageManager.HEURISTIC_COMPLETION,
- new HeuristicCompletionEncoding(xid, isCommit),
- true,
- getContext(true));
+ JournalStorageManager.HEURISTIC_COMPLETION,
+ new HeuristicCompletionEncoding(xid, isCommit),
+ true,
+ getContext(true));
return id;
}
@@ -740,12 +740,12 @@
public void updateScheduledDeliveryTimeTransactional(final long txID, final MessageReference ref) throws Exception
{
ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(), ref.getQueue()
- .getID());
+ .getID());
messageJournal.appendUpdateRecordTransactional(txID,
- ref.getMessage().getMessageID(),
- JournalStorageManager.SET_SCHEDULED_DELIVERY_TIME,
- encoding);
+ ref.getMessage().getMessageID(),
+ JournalStorageManager.SET_SCHEDULED_DELIVERY_TIME,
+ encoding);
}
public void prepare(final long txID, final Xid xid) throws Exception
@@ -762,7 +762,7 @@
{
bindingsJournal.appendCommitRecord(txID, true);
}
-
+
public void rollbackBindings(final long txID) throws Exception
{
// no need to sync, it's going away anyways
@@ -820,14 +820,14 @@
{
ref.setPersistedCount(ref.getDeliveryCount());
DeliveryCountUpdateEncoding updateInfo = new DeliveryCountUpdateEncoding(ref.getQueue().getID(),
- ref.getDeliveryCount());
+ ref.getDeliveryCount());
messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
- JournalStorageManager.UPDATE_DELIVERY_COUNT,
- updateInfo,
+ JournalStorageManager.UPDATE_DELIVERY_COUNT,
+ updateInfo,
- syncNonTransactional,
- getContext(syncNonTransactional));
+ syncNonTransactional,
+ getContext(syncNonTransactional));
}
}
@@ -905,8 +905,8 @@
Map<Long, ServerMessage> messages = new HashMap<Long, ServerMessage>();
JournalLoadInformation info = messageJournal.load(records,
- preparedTransactions,
- new LargeMessageTXFailureCallback(messages));
+ preparedTransactions,
+ new LargeMessageTXFailureCallback(messages));
ArrayList<LargeServerMessage> largeMessages = new ArrayList<LargeServerMessage>();
@@ -1011,8 +1011,8 @@
if (queueMessages == null)
{
log.error("Cannot find queue messages for queueID=" + encoding.queueID +
- " on ack for messageID=" +
- messageID);
+ " on ack for messageID=" +
+ messageID);
}
else
{
@@ -1094,9 +1094,9 @@
if (queueMessages == null)
{
log.error("Cannot find queue messages " + encoding.queueID +
- " for message " +
- messageID +
- " while processing scheduled messages");
+ " for message " +
+ messageID +
+ " while processing scheduled messages");
}
else
{
@@ -1158,7 +1158,7 @@
{
log.info("Can't find queue " + encoding.queueID + " while reloading ACKNOWLEDGE_CURSOR, deleting record now");
messageJournal.appendDeleteRecord(record.id, false);
-
+
}
break;
@@ -1271,14 +1271,14 @@
}
loadPreparedTransactions(postOffice,
- pagingManager,
- resourceManager,
- queues,
- queueInfos,
- preparedTransactions,
- duplicateIDMap,
- pageSubscriptions,
- pendingLargeMessages);
+ pagingManager,
+ resourceManager,
+ queues,
+ queueInfos,
+ preparedTransactions,
+ duplicateIDMap,
+ pageSubscriptions,
+ pendingLargeMessages);
for (PageSubscription sub : pageSubscriptions.values())
{
@@ -1290,7 +1290,7 @@
if (msg.getRefCount() == 0)
{
JournalStorageManager.log.info("Large message: " + msg.getMessageID() +
- " didn't have any associated reference, file will be deleted");
+ " didn't have any associated reference, file will be deleted");
msg.decrementDelayDeletionCount();
}
}
@@ -1302,7 +1302,7 @@
log.info("Deleting unreferenced message id=" + msg.getMessageID() + " from the journal");
try
{
- deleteMessage(msg.getMessageID());
+ deleteMessage(msg.getMessageID());
}
catch (Exception ignored)
{
@@ -1370,8 +1370,8 @@
public void addGrouping(final GroupBinding groupBinding) throws Exception
{
GroupingEncoding groupingEncoding = new GroupingEncoding(groupBinding.getId(),
- groupBinding.getGroupId(),
- groupBinding.getClusterName());
+ groupBinding.getGroupId(),
+ groupBinding.getClusterName());
bindingsJournal.appendAddRecord(groupBinding.getId(), JournalStorageManager.GROUP_RECORD, groupingEncoding, true);
}
@@ -1391,12 +1391,12 @@
SimpleString filterString = filter == null ? null : filter.getFilterString();
PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(),
- binding.getAddress(),
- filterString);
+ binding.getAddress(),
+ filterString);
bindingsJournal.appendAddRecordTransactional(tx, binding.getID(),
- JournalStorageManager.QUEUE_BINDING_RECORD,
- bindingEncoding);
+ JournalStorageManager.QUEUE_BINDING_RECORD,
+ bindingEncoding);
}
public void deleteQueueBinding(final long queueBindingID) throws Exception
@@ -1411,9 +1411,9 @@
{
long recordID = idGenerator.generateID();
messageJournal.appendAddRecordTransactional(txID,
- recordID,
- JournalStorageManager.PAGE_CURSOR_COUNTER_INC,
- new PageCountRecordInc(queueID, value));
+ recordID,
+ JournalStorageManager.PAGE_CURSOR_COUNTER_INC,
+ new PageCountRecordInc(queueID, value));
return recordID;
}
@@ -1424,10 +1424,10 @@
{
long recordID = idGenerator.generateID();
messageJournal.appendAddRecord(recordID,
- JournalStorageManager.PAGE_CURSOR_COUNTER_INC,
- new PageCountRecordInc(queueID, value),
- true,
- getContext());
+ JournalStorageManager.PAGE_CURSOR_COUNTER_INC,
+ new PageCountRecordInc(queueID, value),
+ true,
+ getContext());
return recordID;
}
@@ -1438,9 +1438,9 @@
{
long recordID = idGenerator.generateID();
messageJournal.appendAddRecordTransactional(txID,
- recordID,
- JournalStorageManager.PAGE_CURSOR_COUNTER_VALUE,
- new PageCountRecord(queueID, value));
+ recordID,
+ JournalStorageManager.PAGE_CURSOR_COUNTER_VALUE,
+ new PageCountRecord(queueID, value));
return recordID;
}
@@ -1479,13 +1479,13 @@
ConfigurationImpl defaultValues = new ConfigurationImpl();
JournalImpl messagesJournal = new JournalImpl(defaultValues.getJournalFileSize(),
- defaultValues.getJournalMinFiles(),
- 0,
- 0,
- messagesFF,
- "hornetq-data",
- "hq",
- 1);
+ defaultValues.getJournalMinFiles(),
+ 0,
+ 0,
+ messagesFF,
+ "hornetq-data",
+ "hq",
+ 1);
describeJournal(messagesFF, messagesJournal);
}
@@ -1568,8 +1568,8 @@
cleanupIncompleteFiles();
singleThreadExecutor = Executors.newSingleThreadExecutor(new HornetQThreadFactory("HornetQ-IO-SingleThread",
- true,
- getThisClassLoader()));
+ true,
+ getThisClassLoader()));
bindingsJournal.start();
@@ -1762,9 +1762,9 @@
{
// for compatibility: couple with old behaviour, copying the old file to avoid message loss
long originalMessageID = largeMessage.getLongProperty(Message.HDR_ORIG_MESSAGE_ID);
-
+
SequentialFile currentFile = createFileForLargeMessage(largeMessage.getMessageID(), true);
-
+
if (!currentFile.exists())
{
SequentialFile linkedFile = createFileForLargeMessage(originalMessageID, true);
@@ -1774,7 +1774,7 @@
linkedFile.close();
}
}
-
+
currentFile.close();
}
@@ -1847,7 +1847,7 @@
if (queue == null)
{
log.warn("Message in prepared tx for queue " + encoding.queueID +
- " which does not exist. This message will be ignored.");
+ " which does not exist. This message will be ignored.");
}
else
@@ -1946,9 +1946,9 @@
encoding.position.setRecordID(record.id);
PageSubscription sub = locateSubscription(encoding.queueID,
- pageSubscriptions,
- queueInfos,
- pagingManager);
+ pageSubscriptions,
+ queueInfos,
+ pagingManager);
if (sub != null)
{
@@ -1975,9 +1975,9 @@
encoding.decode(buff);
PageSubscription sub = locateSubscription(encoding.queueID,
- pageSubscriptions,
- queueInfos,
- pagingManager);
+ pageSubscriptions,
+ queueInfos,
+ pagingManager);
if (sub != null)
{
@@ -1994,7 +1994,7 @@
default:
{
JournalStorageManager.log.warn("InternalError: Record type " + recordType +
- " not recognized. Maybe you're using journal files created on a different version");
+ " not recognized. Maybe you're using journal files created on a different version");
}
}
}
@@ -2331,13 +2331,13 @@
public String toString()
{
return "PersistentQueueBindingEncoding [id=" + id +
- ", name=" +
- name +
- ", address=" +
- address +
- ", filterString=" +
- filterString +
- "]";
+ ", name=" +
+ name +
+ ", address=" +
+ address +
+ ", filterString=" +
+ filterString +
+ "]";
}
public PersistentQueueBindingEncoding(final SimpleString name,
@@ -2391,7 +2391,7 @@
public int getEncodeSize()
{
return SimpleString.sizeofString(name) + SimpleString.sizeofString(address) +
- SimpleString.sizeofNullableString(filterString);
+ SimpleString.sizeofNullableString(filterString);
}
}
@@ -3047,12 +3047,12 @@
private static String describeRecord(RecordInfo info)
{
return "recordID=" + info.id +
- ";userRecordType=" +
- info.userRecordType +
- ";isUpdate=" +
- info.isUpdate +
- ";" +
- newObjectEncoding(info);
+ ";userRecordType=" +
+ info.userRecordType +
+ ";isUpdate=" +
+ info.isUpdate +
+ ";" +
+ newObjectEncoding(info);
}
private static String describeRecord(RecordInfo info, Object o)
@@ -3388,10 +3388,10 @@
public void onReadPrepareRecord(final long transactionID, final byte[] extraData, final int numberOfRecords) throws Exception
{
out.println("operation at Prepare,txID=" + transactionID +
- ",numberOfRecords=" +
- numberOfRecords +
- ",extraData=" +
- encode(extraData));
+ ",numberOfRecords=" +
+ numberOfRecords +
+ ",extraData=" +
+ encode(extraData));
}
public void onReadDeleteRecordTX(final long transactionID, final RecordInfo recordInfo) throws Exception
@@ -3625,9 +3625,9 @@
catch (Throwable e)
{
log.warn("Error while confirming large message completion on rollback for recordID=" + msg +
- "->" +
- e.getMessage(),
- e);
+ "->" +
+ e.getMessage(),
+ e);
}
}
}
@@ -3642,4 +3642,4 @@
}
-}
\ No newline at end of file
+}
More information about the hornetq-commits
mailing list