Author: borges
Date: 2011-10-28 09:00:40 -0400 (Fri, 28 Oct 2011)
New Revision: 11615
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
Log:
Use locks on confirmPendingLargeMessage(|TX), methods were added on merge from 2_2_EAP.
Modified:
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-10-28
13:00:26 UTC (rev 11614)
+++
trunk/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-10-28
13:00:40 UTC (rev 11615)
@@ -349,66 +349,66 @@
return replicator != null;
}
- /**
- * @param replicationManager
- * @param pagingManager
- * @throws HornetQException
- */
+ /**
+ * @param replicationManager
+ * @param pagingManager
+ * @throws HornetQException
+ */
@Override
- public void startReplication(ReplicationManager replicationManager, PagingManager
pagingManager) throws Exception
- {
- if (!started)
- {
- throw new IllegalStateException("JournalStorageManager must be
started...");
- }
- assert replicationManager != null;
+ public void startReplication(ReplicationManager replicationManager, PagingManager
pagingManager) throws Exception
+ {
+ if (!started)
+ {
+ throw new IllegalStateException("JournalStorageManager must be
started...");
+ }
+ assert replicationManager != null;
- if (!(messageJournal instanceof JournalImpl) || !(bindingsJournal instanceof
JournalImpl))
- {
- throw new HornetQException(HornetQException.INTERNAL_ERROR,
- "journals here are not JournalImpl. You
can't set a replicator!");
- }
- JournalFile[] messageFiles = null;
- JournalFile[] bindingsFiles = null;
+ if (!(messageJournal instanceof JournalImpl) || !(bindingsJournal instanceof
JournalImpl))
+ {
+ throw new HornetQException(HornetQException.INTERNAL_ERROR,
+ "journals here are not JournalImpl. You
can't set a replicator!");
+ }
+ JournalFile[] messageFiles = null;
+ JournalFile[] bindingsFiles = null;
- final Journal localMessageJournal = messageJournal;
- final Journal localBindingsJournal = bindingsJournal;
+ final Journal localMessageJournal = messageJournal;
+ final Journal localBindingsJournal = bindingsJournal;
- Map<String, Long> largeMessageFilesToSync;
- Map<SimpleString, Collection<Integer>> pageFilesToSync;
- storageManagerLock.writeLock().lock();
+ Map<String, Long> largeMessageFilesToSync;
+ Map<SimpleString, Collection<Integer>> pageFilesToSync;
+ storageManagerLock.writeLock().lock();
try
- {
- replicator = replicationManager;
- localMessageJournal.synchronizationLock();
- localBindingsJournal.synchronizationLock();
+ {
+ replicator = replicationManager;
+ localMessageJournal.synchronizationLock();
+ localBindingsJournal.synchronizationLock();
try
- {
+ {
pagingManager.lock();
try
- {
- messageFiles = prepareJournalForCopy(localMessageJournal,
JournalContent.MESSAGES);
- bindingsFiles = prepareJournalForCopy(localBindingsJournal,
JournalContent.BINDINGS);
- pageFilesToSync = getPageInformationForSync(pagingManager);
- largeMessageFilesToSync = getLargeMessageInformation();
- }
+ {
+ messageFiles = prepareJournalForCopy(localMessageJournal,
JournalContent.MESSAGES);
+ bindingsFiles = prepareJournalForCopy(localBindingsJournal,
JournalContent.BINDINGS);
+ pageFilesToSync = getPageInformationForSync(pagingManager);
+ largeMessageFilesToSync = getLargeMessageInformation();
+ }
finally
- {
+ {
pagingManager.unlock();
- }
- }
+ }
+ }
finally
- {
- localMessageJournal.synchronizationUnlock();
- localBindingsJournal.synchronizationUnlock();
- }
+ {
+ localMessageJournal.synchronizationUnlock();
+ localBindingsJournal.synchronizationUnlock();
+ }
bindingsJournal = new ReplicatedJournal(((byte)0), localBindingsJournal,
replicator);
messageJournal = new ReplicatedJournal((byte)1, localMessageJournal,
replicator);
- }
+ }
finally
- {
- storageManagerLock.writeLock().unlock();
- }
+ {
+ storageManagerLock.writeLock().unlock();
+ }
sendJournalFile(messageFiles, JournalContent.MESSAGES);
sendJournalFile(bindingsFiles, JournalContent.BINDINGS);
@@ -416,36 +416,35 @@
sendPagesToBackup(pageFilesToSync, pagingManager);
storageManagerLock.writeLock().lock();
- try
- {
- replicator.sendSynchronizationDone();
- // XXX HORNETQ-720 SEND a compare journals message?
- }
- finally
- {
- storageManagerLock.writeLock().unlock();
- }
- }
+ try
+ {
+ replicator.sendSynchronizationDone();
+ // XXX HORNETQ-720 SEND a compare journals message?
+ }
+ finally
+ {
+ storageManagerLock.writeLock().unlock();
+ }
+ }
+ /**
+ * @param pageFilesToSync
+ * @throws Exception
+ */
+ private void sendPagesToBackup(Map<SimpleString, Collection<Integer>>
pageFilesToSync, PagingManager manager)
+ throws Exception
+ {
+ for (Entry<SimpleString, Collection<Integer>> entry :
pageFilesToSync.entrySet())
+ {
+ if (!started)
+ return;
+ PagingStore store = manager.getPageStore(entry.getKey());
+ store.sendPages(replicator, entry.getValue());
+ }
+ }
/**
- * @param pageFilesToSync
- * @throws Exception
- */
- private void sendPagesToBackup(Map<SimpleString, Collection<Integer>>
pageFilesToSync, PagingManager manager)
- throws Exception
- {
- for (Entry<SimpleString, Collection<Integer>> entry :
pageFilesToSync.entrySet())
- {
- if (!started)
- return;
- PagingStore store = manager.getPageStore(entry.getKey());
- store.sendPages(replicator, entry.getValue());
- }
- }
-
- /**
* @param pagingManager
* @return
* @throws Exception
@@ -573,16 +572,18 @@
}
}
- /* (non-Javadoc)
- * @see
org.hornetq.core.persistence.StorageManager#pageWrite(org.hornetq.utils.SimpleString, int,
org.hornetq.api.core.buffers.ChannelBuffer)
- */
- public void pageWrite(final PagedMessage message, final int pageNumber)
- {
- if (isReplicated())
- {
- replicator.pageWrite(message, pageNumber);
- }
- }
+ /*
+ * (non-Javadoc)
+ * @see
org.hornetq.core.persistence.StorageManager#pageWrite(org.hornetq.utils.SimpleString,
+ * int, org.hornetq.api.core.buffers.ChannelBuffer)
+ */
+ public void pageWrite(final PagedMessage message, final int pageNumber)
+ {
+ if (isReplicated())
+ {
+ replicator.pageWrite(message, pageNumber);
+ }
+ }
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#getContext()
@@ -637,58 +638,58 @@
return new LargeServerMessageImpl(this);
}
- protected final void addBytesToLargeMessage(final SequentialFile file, final long
messageId, final byte[] bytes)
- throws Exception
- {
- readLock();
+ protected final void addBytesToLargeMessage(final SequentialFile file, final long
messageId, final byte[] bytes)
+ throws Exception
+ {
+ readLock();
try
- {
- file.position(file.size());
+ {
+ file.position(file.size());
- file.writeDirect(ByteBuffer.wrap(bytes), false);
+ file.writeDirect(ByteBuffer.wrap(bytes), false);
- if (isReplicated())
- {
- replicator.largeMessageWrite(messageId, bytes);
- }
- }
+ if (isReplicated())
+ {
+ replicator.largeMessageWrite(messageId, bytes);
+ }
+ }
finally
- {
- readUnLock();
- }
- }
+ {
+ readUnLock();
+ }
+ }
public LargeServerMessage createLargeMessage(final long id, final MessageInternal
message) throws Exception
- {
- readLock();
+ {
+ readLock();
try
- {
- if (isReplicated())
- {
- replicator.largeMessageBegin(id);
- }
+ {
+ if (isReplicated())
+ {
+ replicator.largeMessageBegin(id);
+ }
- LargeServerMessageImpl largeMessage =
(LargeServerMessageImpl)createLargeMessage();
+ LargeServerMessageImpl largeMessage =
(LargeServerMessageImpl)createLargeMessage();
- largeMessage.copyHeadersAndProperties(message);
+ largeMessage.copyHeadersAndProperties(message);
- largeMessage.setMessageID(id);
+ largeMessage.setMessageID(id);
- if (largeMessage.isDurable())
- {
- // We store a marker on the journal that the large file is pending
- long pendingRecordID = storePendingLargeMessage(id);
+ if (largeMessage.isDurable())
+ {
+ // We store a marker on the journal that the large file is pending
+ long pendingRecordID = storePendingLargeMessage(id);
- largeMessage.setPendingRecordID(pendingRecordID);
- }
+ largeMessage.setPendingRecordID(pendingRecordID);
+ }
- return largeMessage;
+ return largeMessage;
}
- finally
- {
- readUnLock();
- }
- }
+ finally
+ {
+ readUnLock();
+ }
+ }
// Non transactional operations
@@ -713,100 +714,104 @@
}
}
- public void confirmPendingLargeMessageTX(final Transaction tx, long messageID, long
recordID) throws Exception
+ public void confirmPendingLargeMessageTX(final Transaction tx, long messageID, long
recordID) throws Exception
+ {
+ readLock();
+ try
{
- installLargeMessageConfirmationOnTX(tx, recordID);
- messageJournal.appendDeleteRecordTransactional(tx.getID(),
- recordID,
- new
DeleteEncoding(ADD_LARGE_MESSAGE_PENDING, messageID));
+ installLargeMessageConfirmationOnTX(tx, recordID);
+ messageJournal.appendDeleteRecordTransactional(tx.getID(), recordID,
+ new
DeleteEncoding(ADD_LARGE_MESSAGE_PENDING, messageID));
}
-
- /** We don't need messageID now but we are likely to need it we ever decide to
support a database */
- public void confirmPendingLargeMessage(long recordID) throws Exception
+ finally
{
- messageJournal.appendDeleteRecord(recordID, true, getContext());
+ readUnLock();
}
+ }
- public void storeMessage(final ServerMessage message) throws Exception
- {
- if (message.getMessageID() <= 0)
- {
- // Sanity check only... this shouldn't happen unless there is a
bug
- throw new HornetQException(HornetQException.ILLEGAL_STATE,
"MessageId was not assigned to Message");
- }
-
- readLock();
+ /** We don't need messageID now but we are likely to need it we ever decide to
support a database */
+ public void confirmPendingLargeMessage(long recordID) throws Exception
+ {
+ readLock();
try
- {
- // Note that we don't sync, the add reference that comes immediately
after will sync if appropriate
-
- if (message.isLargeMessage())
- {
- messageJournal.appendAddRecord(message.getMessageID(),
-
JournalStorageManager.ADD_LARGE_MESSAGE,
- new
LargeMessageEncoding((LargeServerMessage)message),
- false,
- getContext(false));
- }
- else
- {
- messageJournal.appendAddRecord(message.getMessageID(),
- JournalStorageManager.ADD_MESSAGE,
- message,
- false,
- getContext(false));
- }
- }
+ {
+ messageJournal.appendDeleteRecord(recordID, true, getContext());
+ }
finally
- {
- readUnLock();
- }
+ {
+ readUnLock();
}
+ }
- public void storeReference(final long queueID, final long messageID, final boolean
last) throws Exception
-
+ public void storeMessage(final ServerMessage message) throws Exception
+ {
+ if (message.getMessageID() <= 0)
{
- readLock();
+ // Sanity check only... this shouldn't happen unless there is a bug
+ throw new HornetQException(HornetQException.ILLEGAL_STATE, "MessageId was
not assigned to Message");
+ }
+
+ readLock();
try
- {
- messageJournal.appendUpdateRecord(messageID,
- JournalStorageManager.ADD_REF,
- new RefEncoding(queueID),
- last && syncNonTransactional,
- getContext(last &&
syncNonTransactional));
- }
+ {
+ // Note that we don't sync, the add reference that comes immediately after
will sync if
+ // appropriate
+
+ if (message.isLargeMessage())
+ {
+ messageJournal.appendAddRecord(message.getMessageID(),
JournalStorageManager.ADD_LARGE_MESSAGE,
+ new
LargeMessageEncoding((LargeServerMessage)message), false,
+ getContext(false));
+ }
+ else
+ {
+ messageJournal.appendAddRecord(message.getMessageID(),
JournalStorageManager.ADD_MESSAGE, message, false,
+ getContext(false));
+ }
+ }
finally
- {
- readUnLock();
- }
+ {
+ readUnLock();
}
+ }
- private void readLock()
+ public void storeReference(final long queueID, final long messageID, final boolean
last) throws Exception
+ {
+ readLock();
+ try
{
- storageManagerLock.readLock().lock();
+ messageJournal.appendUpdateRecord(messageID, JournalStorageManager.ADD_REF, new
RefEncoding(queueID), last &&
+ syncNonTransactional, getContext(last &&
syncNonTransactional));
}
-
- private void readUnLock()
+ finally
{
- storageManagerLock.readLock().unlock();
+ readUnLock();
}
+ }
- public void storeAcknowledge(final long queueID, final long messageID) throws
Exception
- {
- readLock();
+ private void readLock()
+ {
+ storageManagerLock.readLock().lock();
+ }
+
+ private void readUnLock()
+ {
+ storageManagerLock.readLock().unlock();
+ }
+
+ public void storeAcknowledge(final long queueID, final long messageID) throws
Exception
+ {
+ readLock();
try
- {
- messageJournal.appendUpdateRecord(messageID,
- JournalStorageManager.ACKNOWLEDGE_REF,
- new RefEncoding(queueID),
- syncNonTransactional,
- getContext(syncNonTransactional));
- }
+ {
+ messageJournal.appendUpdateRecord(messageID,
JournalStorageManager.ACKNOWLEDGE_REF, new RefEncoding(queueID),
+ syncNonTransactional,
getContext(syncNonTransactional));
+ }
finally
- {
- readUnLock();
- }
+ {
+ readUnLock();
}
+ }
public void storeCursorAcknowledge(long queueID, PagePosition position) throws
Exception
{
@@ -882,135 +887,125 @@
}
}
- public void deleteDuplicateID(final long recordID) throws Exception
- {
- readLock();
+ public void deleteDuplicateID(final long recordID) throws Exception
+ {
+ readLock();
try
- {
- messageJournal.appendDeleteRecord(recordID, syncNonTransactional,
getContext(syncNonTransactional));
- }
+ {
+ messageJournal.appendDeleteRecord(recordID, syncNonTransactional,
getContext(syncNonTransactional));
+ }
finally
- {
- readUnLock();
- }
+ {
+ readUnLock();
}
+ }
// Transactional operations
- public void storeMessageTransactional(final long txID, final ServerMessage message)
throws Exception
+ public void storeMessageTransactional(final long txID, final ServerMessage message)
throws Exception
+ {
+ if (message.getMessageID() <= 0)
{
- if (message.getMessageID() <= 0)
- {
- throw new HornetQException(HornetQException.ILLEGAL_STATE,
"MessageId was not assigned to Message");
- }
+ throw new HornetQException(HornetQException.ILLEGAL_STATE, "MessageId was
not assigned to Message");
+ }
- readLock();
+ readLock();
try
- {
- if (message.isLargeMessage())
- {
- messageJournal.appendAddRecordTransactional(txID,
-
message.getMessageID(),
-
JournalStorageManager.ADD_LARGE_MESSAGE,
- new
LargeMessageEncoding(((LargeServerMessage)message)));
- }
- else
- {
- messageJournal.appendAddRecordTransactional(txID,
-
message.getMessageID(),
-
JournalStorageManager.ADD_MESSAGE,
- message);
- }
+ {
+ if (message.isLargeMessage())
+ {
+ messageJournal.appendAddRecordTransactional(txID, message.getMessageID(),
+
JournalStorageManager.ADD_LARGE_MESSAGE,
+ new
LargeMessageEncoding(((LargeServerMessage)message)));
+ }
+ else
+ {
+ messageJournal.appendAddRecordTransactional(txID, message.getMessageID(),
+
JournalStorageManager.ADD_MESSAGE, message);
+ }
- }
+ }
finally
- {
- readUnLock();
- }
+ {
+ readUnLock();
}
+ }
- public void storePageTransaction(final long txID, final PageTransactionInfo
pageTransaction) throws Exception
- {
- readLock();
+ public void storePageTransaction(final long txID, final PageTransactionInfo
pageTransaction) throws Exception
+ {
+ readLock();
try
- {
- pageTransaction.setRecordID(generateUniqueID());
-
- messageJournal.appendAddRecordTransactional(txID,
- pageTransaction.getRecordID(),
-
JournalStorageManager.PAGE_TRANSACTION,
- pageTransaction);
- }
+ {
+ pageTransaction.setRecordID(generateUniqueID());
+ messageJournal.appendAddRecordTransactional(txID,
pageTransaction.getRecordID(),
+
JournalStorageManager.PAGE_TRANSACTION, pageTransaction);
+ }
finally
- {
- readUnLock();
- }
+ {
+ readUnLock();
}
+ }
- public void updatePageTransaction(final long txID, final PageTransactionInfo
pageTransaction, final int depages) throws Exception
- {
- readLock();
+ public void updatePageTransaction(final long txID, final PageTransactionInfo
pageTransaction, final int depages)
+ throws Exception
+ {
+ readLock();
try
- {
- messageJournal.appendUpdateRecordTransactional(txID,
pageTransaction.getRecordID(),
-
JournalStorageManager.PAGE_TRANSACTION,
- new
PageUpdateTXEncoding(pageTransaction.getTransactionID(),
-
depages));
- }
+ {
+ messageJournal.appendUpdateRecordTransactional(txID,
pageTransaction.getRecordID(),
+
JournalStorageManager.PAGE_TRANSACTION,
+ new
PageUpdateTXEncoding(pageTransaction.getTransactionID(),
+
depages));
+ }
finally
- {
- readUnLock();
- }
+ {
+ readUnLock();
}
+ }
- public void updatePageTransaction(final PageTransactionInfo pageTransaction, final
int depages) throws Exception
- {
- readLock();
+ public void updatePageTransaction(final PageTransactionInfo pageTransaction, final int
depages) throws Exception
+ {
+ readLock();
try
- {
- messageJournal.appendUpdateRecord(pageTransaction.getRecordID(),
- JournalStorageManager.PAGE_TRANSACTION,
- new
PageUpdateTXEncoding(pageTransaction.getTransactionID(), depages),
- syncNonTransactional,
- getContext(syncNonTransactional));
- }
+ {
+ messageJournal.appendUpdateRecord(pageTransaction.getRecordID(),
JournalStorageManager.PAGE_TRANSACTION,
+ new
PageUpdateTXEncoding(pageTransaction.getTransactionID(), depages),
+ syncNonTransactional,
getContext(syncNonTransactional));
+ }
finally
- {
- readUnLock();
- }
+ {
+ readUnLock();
}
+ }
- public void storeReferenceTransactional(final long txID, final long queueID, final
long messageID) throws Exception
- {
- readLock();
+ public void storeReferenceTransactional(final long txID, final long queueID, final
long messageID) throws Exception
+ {
+ readLock();
try
- {
- messageJournal.appendUpdateRecordTransactional(txID,
- messageID,
-
JournalStorageManager.ADD_REF,
- new RefEncoding(queueID));
- }
+ {
+ messageJournal.appendUpdateRecordTransactional(txID, messageID,
JournalStorageManager.ADD_REF,
+ new RefEncoding(queueID));
+ }
finally
- {
- readUnLock();
- }
+ {
+ readUnLock();
}
+ }
- public void storeAcknowledgeTransactional(final long txID, final long queueID,
final long messageID) throws Exception
- {
- readLock();
+ public void storeAcknowledgeTransactional(final long txID, final long queueID, final
long messageID)
+ throws Exception
+ {
+ readLock();
try
- {
- messageJournal.appendUpdateRecordTransactional(txID,
- messageID,
-
JournalStorageManager.ACKNOWLEDGE_REF,
- new RefEncoding(queueID));
- }
+ {
+ messageJournal.appendUpdateRecordTransactional(txID, messageID,
JournalStorageManager.ACKNOWLEDGE_REF,
+ new RefEncoding(queueID));
+ }
finally
- {
- readUnLock();
- }
+ {
+ readUnLock();
}
+ }
/* (non-Javadoc)
* @see
org.hornetq.core.persistence.StorageManager#storeCursorAcknowledgeTransactional(long,
long, org.hornetq.core.paging.cursor.PagePosition)
@@ -1201,48 +1196,46 @@
}
}
- public void deleteDuplicateIDTransactional(final long txID, final long recordID)
throws Exception
- {
- readLock();
+ public void deleteDuplicateIDTransactional(final long txID, final long recordID)
throws Exception
+ {
+ readLock();
try
- {
- messageJournal.appendDeleteRecordTransactional(txID, recordID);
- }
+ {
+ messageJournal.appendDeleteRecordTransactional(txID, recordID);
+ }
finally
- {
- readUnLock();
- }
+ {
+ readUnLock();
}
+ }
- // Other operations
+ // Other operations
- public void updateDeliveryCount(final MessageReference ref) throws Exception
+ public void updateDeliveryCount(final MessageReference ref) throws Exception
+ {
+ // no need to store if it's the same value
+ // otherwise the journal will get OME in case of lots of redeliveries
+ if (ref.getDeliveryCount() != ref.getPersistedCount())
{
- // no need to store if it's the same value
- // otherwise the journal will get OME in case of lots of redeliveries
- if (ref.getDeliveryCount() != ref.getPersistedCount())
- {
- ref.setPersistedCount(ref.getDeliveryCount());
- DeliveryCountUpdateEncoding updateInfo = new
DeliveryCountUpdateEncoding(ref.getQueue().getID(),
-
ref.getDeliveryCount());
+ ref.setPersistedCount(ref.getDeliveryCount());
+ DeliveryCountUpdateEncoding updateInfo =
+ new DeliveryCountUpdateEncoding(ref.getQueue().getID(),
ref.getDeliveryCount());
- readLock();
+ readLock();
try
- {
- messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
-
JournalStorageManager.UPDATE_DELIVERY_COUNT,
- updateInfo,
+ {
+ messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
+
JournalStorageManager.UPDATE_DELIVERY_COUNT, updateInfo,
- syncNonTransactional,
- getContext(syncNonTransactional));
- }
+ syncNonTransactional,
getContext(syncNonTransactional));
+ }
finally
- {
- readUnLock();
- }
- }
+ {
+ readUnLock();
+ }
}
+ }
public void storeAddressSetting(PersistedAddressSetting addressSetting) throws
Exception
{
@@ -1928,26 +1921,22 @@
}
}
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#storePageCounter(long, long,
long)
- */
- public long storePageCounter(long txID, long queueID, long value) throws Exception
- {
- readLock();
+ @Override
+ public long storePageCounter(long txID, long queueID, long value) throws Exception
+ {
+ readLock();
try
- {
- final long recordID = idGenerator.generateID();
- messageJournal.appendAddRecordTransactional(txID,
- recordID,
-
JournalStorageManager.PAGE_CURSOR_COUNTER_VALUE,
- new PageCountRecord(queueID,
value));
- return recordID;
- }
+ {
+ final long recordID = idGenerator.generateID();
+ messageJournal.appendAddRecordTransactional(txID, recordID,
JournalStorageManager.PAGE_CURSOR_COUNTER_VALUE,
+ new PageCountRecord(queueID,
value));
+ return recordID;
+ }
finally
- {
- readUnLock();
- }
- }
+ {
+ readUnLock();
+ }
+ }
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#deleteIncrementRecord(long,
long)
@@ -2184,11 +2173,11 @@
return;
}
Runnable deleteAction = new Runnable()
- {
- public void run()
- {
+ {
+ public void run()
+ {
try
- {
+ {
readLock();
try
{
@@ -2196,31 +2185,31 @@
{
replicator.largeMessageDelete(largeServerMessageImpl.getMessageID());
}
- file.delete();
- }
+ file.delete();
+ }
finally
{
readUnLock();
}
}
catch (Exception e)
- {
- JournalStorageManager.log.warn(e.getMessage(), e);
- }
- }
-
- };
-
- if (executor == null)
{
- deleteAction.run();
+ JournalStorageManager.log.warn(e.getMessage(), e);
}
- else
- {
- executor.execute(deleteAction);
- }
- }
+ }
+ };
+
+ if (executor == null)
+ {
+ deleteAction.run();
+ }
+ else
+ {
+ executor.execute(deleteAction);
+ }
+ }
+
/**
* @param messageID
* @return
@@ -2542,33 +2531,30 @@
}
}
- /**
- * @throws Exception
- */
- private void cleanupIncompleteFiles() throws Exception
- {
- if (largeMessagesFactory != null)
- {
- List<String> tmpFiles =
largeMessagesFactory.listFiles("tmp");
- for (String tmpFile : tmpFiles)
- {
- SequentialFile file =
largeMessagesFactory.createSequentialFile(tmpFile, -1);
- file.delete();
- }
- }
- }
+ private void cleanupIncompleteFiles() throws Exception
+ {
+ if (largeMessagesFactory != null)
+ {
+ List<String> tmpFiles = largeMessagesFactory.listFiles("tmp");
+ for (String tmpFile : tmpFiles)
+ {
+ SequentialFile file = largeMessagesFactory.createSequentialFile(tmpFile,
-1);
+ file.delete();
+ }
+ }
+ }
- private OperationContext getContext(final boolean sync)
- {
- if (sync)
- {
- return getContext();
- }
- else
- {
- return DummyOperationContext.getInstance();
- }
- }
+ private OperationContext getContext(final boolean sync)
+ {
+ if (sync)
+ {
+ return getContext();
+ }
+ else
+ {
+ return DummyOperationContext.getInstance();
+ }
+ }
private static ClassLoader getThisClassLoader()
{
@@ -2714,14 +2700,11 @@
public boolean isCommit;
- /* (non-Javadoc)
- * @see java.lang.Object#toString()
- */
@Override
- public String toString()
- {
- return "HeuristicCompletionEncoding [xid=" + xid + ",
isCommit=" + isCommit + "]";
- }
+ public String toString()
+ {
+ return "HeuristicCompletionEncoding [xid=" + xid + ",
isCommit=" + isCommit + "]";
+ }
HeuristicCompletionEncoding(final Xid xid, final boolean isCommit)
{
@@ -2807,15 +2790,12 @@
return clusterName;
}
- /* (non-Javadoc)
- * @see java.lang.Object#toString()
- */
@Override
- public String toString()
- {
- return "GroupingEncoding [id=" + id + ", groupId=" +
groupId + ", clusterName=" + clusterName + "]";
- }
- }
+ public String toString()
+ {
+ return "GroupingEncoding [id=" + id + ", groupId=" + groupId
+ ", clusterName=" + clusterName + "]";
+ }
+ }
public static class PersistentQueueBindingEncoding implements EncodingSupport,
QueueBindingInfo
{
@@ -2831,21 +2811,12 @@
{
}
- /* (non-Javadoc)
- * @see java.lang.Object#toString()
- */
@Override
- public String toString()
- {
- return "PersistentQueueBindingEncoding [id=" + id +
- ", name=" +
- name +
- ", address=" +
- address +
- ", filterString=" +
- filterString +
- "]";
- }
+ public String toString()
+ {
+ return "PersistentQueueBindingEncoding [id=" + id + ",
name=" + name + ", address=" + address +
+ ", filterString=" + filterString + "]";
+ }
public PersistentQueueBindingEncoding(final SimpleString name,
final SimpleString address,
@@ -3017,15 +2988,11 @@
return 8 + 4;
}
- /* (non-Javadoc)
- * @see java.lang.Object#toString()
- */
@Override
- public String toString()
- {
- return "DeliveryCountUpdateEncoding [queueID=" + queueID + ",
count=" + count + "]";
- }
-
+ public String toString()
+ {
+ return "DeliveryCountUpdateEncoding [queueID=" + queueID + ",
count=" + count + "]";
+ }
}
public static class QueueEncoding implements EncodingSupport
@@ -3058,33 +3025,30 @@
return 8;
}
- /* (non-Javadoc)
- * @see java.lang.Object#toString()
- */
@Override
- public String toString()
- {
- return "QueueEncoding [queueID=" + queueID + "]";
- }
+ public String toString()
+ {
+ return "QueueEncoding [queueID=" + queueID + "]";
+ }
}
private static class DeleteEncoding extends QueueEncoding
{
- public byte recordType;
+ public byte recordType;
- public long id;
+ public long id;
- public DeleteEncoding()
- {
- super();
- }
+ public DeleteEncoding()
+ {
+ super();
+ }
- public DeleteEncoding(final byte recordType, final long id)
- {
- this.recordType = recordType;
- this.id = id;
- }
+ public DeleteEncoding(final byte recordType, final long id)
+ {
+ this.recordType = recordType;
+ this.id = id;
+ }
/* (non-Javadoc)
* @see org.hornetq.core.journal.EncodingSupport#getEncodeSize()
@@ -3132,18 +3096,15 @@
public static class PageUpdateTXEncoding implements EncodingSupport
{
- public long pageTX;
+ public long pageTX;
- public int recods;
+ public int recods;
- /* (non-Javadoc)
- * @see java.lang.Object#toString()
- */
@Override
- public String toString()
- {
- return "PageUpdateTXEncoding [pageTX=" + pageTX + ",
recods=" + recods + "]";
- }
+ public String toString()
+ {
+ return "PageUpdateTXEncoding [pageTX=" + pageTX + ",
recods=" + recods + "]";
+ }
public PageUpdateTXEncoding()
{
@@ -3188,14 +3149,11 @@
{
long scheduledDeliveryTime;
- /* (non-Javadoc)
- * @see java.lang.Object#toString()
- */
@Override
- public String toString()
- {
- return "ScheduledDeliveryEncoding [scheduledDeliveryTime=" +
scheduledDeliveryTime + "]";
- }
+ public String toString()
+ {
+ return "ScheduledDeliveryEncoding [scheduledDeliveryTime=" +
scheduledDeliveryTime + "]";
+ }
private ScheduledDeliveryEncoding(final long scheduledDeliveryTime, final long
queueID)
{
@@ -3270,12 +3228,9 @@
return SimpleString.sizeofString(address) + DataConstants.SIZE_INT +
duplID.length;
}
- /* (non-Javadoc)
- * @see java.lang.Object#toString()
- */
@Override
- public String toString()
- {
+ public String toString()
+ {
// this would be useful when testing. Most tests on the testsuite will use a
SimpleString on the duplicate ID
// and this may be useful to validate the journal on those tests
// You may uncomment these two lines on that case and replcate the toString
for the PrintData
@@ -3341,14 +3296,11 @@
private static final class PageCountRecord implements EncodingSupport
{
- /* (non-Javadoc)
- * @see java.lang.Object#toString()
- */
@Override
- public String toString()
- {
- return "PageCountRecord [queueID=" + queueID + ", value="
+ value + "]";
- }
+ public String toString()
+ {
+ return "PageCountRecord [queueID=" + queueID + ", value=" +
value + "]";
+ }
PageCountRecord()
{
@@ -3396,14 +3348,11 @@
private static final class PageCountRecordInc implements EncodingSupport
{
- /* (non-Javadoc)
- * @see java.lang.Object#toString()
- */
@Override
- public String toString()
- {
- return "PageCountRecordInc [queueID=" + queueID + ",
value=" + value + "]";
- }
+ public String toString()
+ {
+ return "PageCountRecordInc [queueID=" + queueID + ", value="
+ value + "]";
+ }
PageCountRecordInc()
{
@@ -3475,14 +3424,11 @@
this.position = new PagePositionImpl();
}
- /* (non-Javadoc)
- * @see java.lang.Object#toString()
- */
@Override
- public String toString()
- {
- return "CursorAckRecordEncoding [queueID=" + queueID + ",
position=" + position + "]";
- }
+ public String toString()
+ {
+ return "CursorAckRecordEncoding [queueID=" + queueID + ",
position=" + position + "]";
+ }
public long queueID;
@@ -3520,57 +3466,50 @@
private class LargeMessageTXFailureCallback implements TransactionFailureCallback
{
- private final Map<Long, ServerMessage> messages;
+ private final Map<Long, ServerMessage> messages;
- public LargeMessageTXFailureCallback(final Map<Long, ServerMessage>
messages)
- {
- super();
- this.messages = messages;
- }
+ public LargeMessageTXFailureCallback(final Map<Long, ServerMessage>
messages)
+ {
+ super();
+ this.messages = messages;
+ }
- public void failedTransaction(final long transactionID,
- final List<RecordInfo> records,
- final List<RecordInfo> recordsToDelete)
- {
- for (RecordInfo record : records)
- {
- if (record.userRecordType == JournalStorageManager.ADD_LARGE_MESSAGE)
- {
- byte[] data = record.data;
+ public void failedTransaction(final long transactionID, final
List<RecordInfo> records,
+ final List<RecordInfo> recordsToDelete)
+ {
+ for (RecordInfo record : records)
+ {
+ if (record.userRecordType == JournalStorageManager.ADD_LARGE_MESSAGE)
+ {
+ byte[] data = record.data;
- HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
+ HornetQBuffer buff = HornetQBuffers.wrappedBuffer(data);
try
- {
- LargeServerMessage serverMessage = parseLargeMessage(messages,
buff);
- serverMessage.decrementDelayDeletionCount();
- }
+ {
+ LargeServerMessage serverMessage = parseLargeMessage(messages, buff);
+ serverMessage.decrementDelayDeletionCount();
+ }
catch (Exception e)
- {
- JournalStorageManager.log.warn(e.getMessage(), e);
- }
- }
+ {
+ JournalStorageManager.log.warn(e.getMessage(), e);
}
- }
+ }
+ }
+ }
+ }
+ private static String describeRecord(RecordInfo info)
+ {
+ return "recordID=" + info.id + ";userRecordType=" +
info.userRecordType + ";isUpdate=" + info.isUpdate + ";" +
+ newObjectEncoding(info);
}
- private static String describeRecord(RecordInfo info)
- {
- return "recordID=" + info.id +
- ";userRecordType=" +
- info.userRecordType +
- ";isUpdate=" +
- info.isUpdate +
- ";" +
- newObjectEncoding(info);
- }
+ private static String describeRecord(RecordInfo info, Object o)
+ {
+ return "recordID=" + info.id + ";userRecordType=" +
info.userRecordType + ";isUpdate=" + info.isUpdate + ";" + o;
+ }
- private static String describeRecord(RecordInfo info, Object o)
- {
- return "recordID=" + info.id + ";userRecordType=" +
info.userRecordType + ";isUpdate=" + info.isUpdate + ";" + o;
- }
-
// Encoding functions for binding Journal
public static Object newObjectEncoding(RecordInfo info)
@@ -3861,220 +3800,216 @@
return Base64.encodeBytes(data, 0, data.length, Base64.DONT_BREAK_LINES |
Base64.URL_SAFE);
}
- /**
- * @param fileFactory
- * @param journal
- * @throws Exception
- */
+ /**
+ * @param fileFactory
+ * @param journal
+ * @throws Exception
+ */
private static void describeJournal(SequentialFileFactory fileFactory, JournalImpl
journal) throws Exception
- {
- List<JournalFile> files = journal.orderFiles();
+ {
+ List<JournalFile> files = journal.orderFiles();
- final PrintStream out = System.out;
+ final PrintStream out = System.out;
- for (JournalFile file : files)
- {
- out.println("#" + file);
+ for (JournalFile file : files)
+ {
+ out.println("#" + file);
- JournalImpl.readJournalFile(fileFactory, file, new
JournalReaderCallback()
- {
+ JournalImpl.readJournalFile(fileFactory, file, new JournalReaderCallback()
+ {
- public void onReadUpdateRecordTX(final long transactionID, final
RecordInfo recordInfo) throws Exception
- {
- out.println("operation@UpdateTX;txID=" +
transactionID + "," + describeRecord(recordInfo));
- }
+ public void onReadUpdateRecordTX(final long transactionID, final RecordInfo
recordInfo) throws Exception
+ {
+ out.println("operation@UpdateTX;txID=" + transactionID +
"," + describeRecord(recordInfo));
+ }
- public void onReadUpdateRecord(final RecordInfo recordInfo)
throws Exception
- {
- out.println("operation@Update;" +
describeRecord(recordInfo));
- }
+ public void onReadUpdateRecord(final RecordInfo recordInfo) throws Exception
+ {
+ out.println("operation@Update;" + describeRecord(recordInfo));
+ }
- public void onReadRollbackRecord(final long transactionID) throws
Exception
- {
- out.println("operation@Rollback;txID=" +
transactionID);
- }
+ public void onReadRollbackRecord(final long transactionID) throws Exception
+ {
+ out.println("operation@Rollback;txID=" + transactionID);
+ }
- public void onReadPrepareRecord(final long transactionID, final
byte[] extraData, final int numberOfRecords) throws Exception
- {
- out.println("operation@Prepare,txID=" +
transactionID +
- ",numberOfRecords=" +
- numberOfRecords +
- ",extraData=" +
- encode(extraData));
- }
+ public void
+ onReadPrepareRecord(final long transactionID, final byte[] extraData,
final int numberOfRecords)
+ throws Exception
+ {
+ out.println("operation@Prepare,txID=" + transactionID +
",numberOfRecords=" + numberOfRecords +
+ ",extraData=" + encode(extraData));
+ }
- public void onReadDeleteRecordTX(final long transactionID, final
RecordInfo recordInfo) throws Exception
- {
- out.println("operation@DeleteRecordTX;txID=" +
transactionID + "," + describeRecord(recordInfo));
- }
+ public void onReadDeleteRecordTX(final long transactionID, final RecordInfo
recordInfo) throws Exception
+ {
+ out.println("operation@DeleteRecordTX;txID=" + transactionID +
"," + describeRecord(recordInfo));
+ }
- public void onReadDeleteRecord(final long recordID) throws
Exception
- {
- out.println("operation@DeleteRecord;recordID=" +
recordID);
- }
+ public void onReadDeleteRecord(final long recordID) throws Exception
+ {
+ out.println("operation@DeleteRecord;recordID=" + recordID);
+ }
- public void onReadCommitRecord(final long transactionID, final
int numberOfRecords) throws Exception
- {
- out.println("operation@Commit;txID=" +
transactionID + ",numberOfRecords=" + numberOfRecords);
- }
+ public void onReadCommitRecord(final long transactionID, final int
numberOfRecords) throws Exception
+ {
+ out.println("operation@Commit;txID=" + transactionID +
",numberOfRecords=" + numberOfRecords);
+ }
- public void onReadAddRecordTX(final long transactionID, final
RecordInfo recordInfo) throws Exception
- {
- out.println("operation@AddRecordTX;txID=" +
transactionID + "," + describeRecord(recordInfo));
- }
+ public void onReadAddRecordTX(final long transactionID, final RecordInfo
recordInfo) throws Exception
+ {
+ out.println("operation@AddRecordTX;txID=" + transactionID +
"," + describeRecord(recordInfo));
+ }
- public void onReadAddRecord(final RecordInfo recordInfo) throws
Exception
- {
- out.println("operation@AddRecord;" +
describeRecord(recordInfo));
- }
+ public void onReadAddRecord(final RecordInfo recordInfo) throws Exception
+ {
+ out.println("operation@AddRecord;" +
describeRecord(recordInfo));
+ }
- public void markAsDataFile(final JournalFile file)
- {
- }
- });
+ public void markAsDataFile(final JournalFile file)
+ {
}
+ });
+ }
- out.println();
+ out.println();
- out.println("### Surviving Records Summary ###");
+ out.println("### Surviving Records Summary ###");
- List<RecordInfo> records = new LinkedList<RecordInfo>();
- List<PreparedTransactionInfo> preparedTransactions = new
LinkedList<PreparedTransactionInfo>();
+ List<RecordInfo> records = new LinkedList<RecordInfo>();
+ List<PreparedTransactionInfo> preparedTransactions = new
LinkedList<PreparedTransactionInfo>();
- journal.start();
+ journal.start();
- final StringBuffer bufferFailingTransactions = new StringBuffer();
+ final StringBuffer bufferFailingTransactions = new StringBuffer();
- int messageCount = 0;
- Map<Long, Integer> messageRefCounts = new HashMap<Long, Integer>();
- int preparedMessageCount = 0;
- Map<Long, Integer> preparedMessageRefCount = new HashMap<Long,
Integer>();
- journal.load(records, preparedTransactions, new TransactionFailureCallback()
- {
+ int messageCount = 0;
+ Map<Long, Integer> messageRefCounts = new HashMap<Long, Integer>();
+ int preparedMessageCount = 0;
+ Map<Long, Integer> preparedMessageRefCount = new HashMap<Long,
Integer>();
+ journal.load(records, preparedTransactions, new TransactionFailureCallback()
+ {
- public void failedTransaction(long transactionID, List<RecordInfo>
records, List<RecordInfo> recordsToDelete)
- {
- bufferFailingTransactions.append("Transaction " +
transactionID + " failed with these records:\n");
- for (RecordInfo info : records)
- {
- bufferFailingTransactions.append("- " +
describeRecord(info) + "\n");
- }
-
- for (RecordInfo info : recordsToDelete)
- {
- bufferFailingTransactions.append("- " +
describeRecord(info) + " <marked to delete>\n");
- }
-
- }
- }, false);
-
- for (RecordInfo info : records)
+ public void failedTransaction(long transactionID, List<RecordInfo>
records, List<RecordInfo> recordsToDelete)
+ {
+ bufferFailingTransactions.append("Transaction " + transactionID +
" failed with these records:\n");
+ for (RecordInfo info : records)
{
- Object o = newObjectEncoding(info);
- if (info.getUserRecordType() == JournalStorageManager.ADD_MESSAGE)
- {
- messageCount++;
- }
- else if (info.getUserRecordType() == JournalStorageManager.ADD_REF)
- {
- ReferenceDescribe ref = (ReferenceDescribe)o;
- Integer count = messageRefCounts.get(ref.refEncoding.queueID);
- if (count == null)
- {
- count = 1;
- messageRefCounts.put(ref.refEncoding.queueID, count);
- }
- else
- {
- messageRefCounts.put(ref.refEncoding.queueID, count +
1);
- }
- }
- else if (info.getUserRecordType() ==
JournalStorageManager.ACKNOWLEDGE_REF)
- {
- AckDescribe ref = (AckDescribe)o;
- Integer count = messageRefCounts.get(ref.refEncoding.queueID);
- if (count == null)
- {
- messageRefCounts.put(ref.refEncoding.queueID, 0);
- }
- else
- {
- messageRefCounts.put(ref.refEncoding.queueID, count -
1);
- }
- }
- out.println(describeRecord(info, o));
+ bufferFailingTransactions.append("- " + describeRecord(info) +
"\n");
}
- out.println();
- out.println("### Prepared TX ###");
-
- for (PreparedTransactionInfo tx : preparedTransactions)
+ for (RecordInfo info : recordsToDelete)
{
- System.out.println(tx.id);
- for (RecordInfo info : tx.records)
- {
- Object o = newObjectEncoding(info);
- out.println("- " + describeRecord(info, o));
- if (info.getUserRecordType() == 31)
- {
- preparedMessageCount++;
- }
- else if (info.getUserRecordType() == 32)
- {
- ReferenceDescribe ref = (ReferenceDescribe)o;
- Integer count =
preparedMessageRefCount.get(ref.refEncoding.queueID);
- if (count == null)
- {
- count = 1;
-
preparedMessageRefCount.put(ref.refEncoding.queueID, count);
- }
- else
- {
-
preparedMessageRefCount.put(ref.refEncoding.queueID, count + 1);
- }
- }
- }
-
- for (RecordInfo info : tx.recordsToDelete)
- {
- out.println("- " + describeRecord(info) + "
<marked to delete>");
- }
+ bufferFailingTransactions.append("- " + describeRecord(info) +
" <marked to delete>\n");
}
- String missingTX = bufferFailingTransactions.toString();
+ }
+ }, false);
- if (missingTX.length() > 0)
+ for (RecordInfo info : records)
+ {
+ Object o = newObjectEncoding(info);
+ if (info.getUserRecordType() == JournalStorageManager.ADD_MESSAGE)
+ {
+ messageCount++;
+ }
+ else if (info.getUserRecordType() == JournalStorageManager.ADD_REF)
+ {
+ ReferenceDescribe ref = (ReferenceDescribe)o;
+ Integer count = messageRefCounts.get(ref.refEncoding.queueID);
+ if (count == null)
{
- out.println();
- out.println("### Failed Transactions (Missing
commit/prepare/rollback record) ###");
+ count = 1;
+ messageRefCounts.put(ref.refEncoding.queueID, count);
}
-
- out.println(bufferFailingTransactions.toString());
-
- out.println("### Message Counts ###");
- out.println("message count=" + messageCount);
- out.println("message reference count");
- for (Map.Entry<Long, Integer> longIntegerEntry :
messageRefCounts.entrySet())
+ else
{
- System.out.println("queue id " + longIntegerEntry.getKey() +
",count=" + longIntegerEntry.getValue());
+ messageRefCounts.put(ref.refEncoding.queueID, count + 1);
}
+ }
+ else if (info.getUserRecordType() == JournalStorageManager.ACKNOWLEDGE_REF)
+ {
+ AckDescribe ref = (AckDescribe)o;
+ Integer count = messageRefCounts.get(ref.refEncoding.queueID);
+ if (count == null)
+ {
+ messageRefCounts.put(ref.refEncoding.queueID, 0);
+ }
+ else
+ {
+ messageRefCounts.put(ref.refEncoding.queueID, count - 1);
+ }
+ }
+ out.println(describeRecord(info, o));
+ }
- out.println("prepared message count=" + preparedMessageCount);
+ out.println();
+ out.println("### Prepared TX ###");
- for (Map.Entry<Long, Integer> longIntegerEntry :
preparedMessageRefCount.entrySet())
+ for (PreparedTransactionInfo tx : preparedTransactions)
+ {
+ System.out.println(tx.id);
+ for (RecordInfo info : tx.records)
+ {
+ Object o = newObjectEncoding(info);
+ out.println("- " + describeRecord(info, o));
+ if (info.getUserRecordType() == 31)
{
- System.out.println("queue id " + longIntegerEntry.getKey() +
",count=" + longIntegerEntry.getValue());
+ preparedMessageCount++;
}
+ else if (info.getUserRecordType() == 32)
+ {
+ ReferenceDescribe ref = (ReferenceDescribe)o;
+ Integer count = preparedMessageRefCount.get(ref.refEncoding.queueID);
+ if (count == null)
+ {
+ count = 1;
+ preparedMessageRefCount.put(ref.refEncoding.queueID, count);
+ }
+ else
+ {
+ preparedMessageRefCount.put(ref.refEncoding.queueID, count + 1);
+ }
+ }
+ }
- journal.stop();
- }
+ for (RecordInfo info : tx.recordsToDelete)
+ {
+ out.println("- " + describeRecord(info) + " <marked to
delete>");
+ }
+ }
+ String missingTX = bufferFailingTransactions.toString();
+
+ if (missingTX.length() > 0)
+ {
+ out.println();
+ out.println("### Failed Transactions (Missing commit/prepare/rollback
record) ###");
+ }
+
+ out.println(bufferFailingTransactions.toString());
+
+ out.println("### Message Counts ###");
+ out.println("message count=" + messageCount);
+ out.println("message reference count");
+ for (Map.Entry<Long, Integer> longIntegerEntry :
messageRefCounts.entrySet())
+ {
+ System.out.println("queue id " + longIntegerEntry.getKey() +
",count=" + longIntegerEntry.getValue());
+ }
+
+ out.println("prepared message count=" + preparedMessageCount);
+
+ for (Map.Entry<Long, Integer> longIntegerEntry :
preparedMessageRefCount.entrySet())
+ {
+ System.out.println("queue id " + longIntegerEntry.getKey() +
",count=" + longIntegerEntry.getValue());
+ }
+
+ journal.stop();
+ }
+
@Override
- public boolean addToPage(PagingManager pagingManager,
- SimpleString address,
- ServerMessage message,
- RoutingContext ctx,
- RouteContextList listCtx) throws Exception
+ public boolean addToPage(PagingManager pagingManager, SimpleString address,
ServerMessage message,
+ RoutingContext ctx, RouteContextList listCtx) throws Exception
{
readLock();
try
@@ -4088,16 +4023,17 @@
}
}
- private void installLargeMessageConfirmationOnTX(Transaction tx, long recordID)
- {
- TXLargeMessageConfirmationOperation txoper =
(TXLargeMessageConfirmationOperation)tx.getProperty(TransactionPropertyIndexes.LARGE_MESSAGE_CONFIRMATIONS);
- if (txoper == null)
- {
- txoper = new TXLargeMessageConfirmationOperation();
- tx.putProperty(TransactionPropertyIndexes.LARGE_MESSAGE_CONFIRMATIONS,
txoper);
- }
- txoper.confirmedMessages.add(recordID);
- }
+ private void installLargeMessageConfirmationOnTX(Transaction tx, long recordID)
+ {
+ TXLargeMessageConfirmationOperation txoper =
+
(TXLargeMessageConfirmationOperation)tx.getProperty(TransactionPropertyIndexes.LARGE_MESSAGE_CONFIRMATIONS);
+ if (txoper == null)
+ {
+ txoper = new TXLargeMessageConfirmationOperation();
+ tx.putProperty(TransactionPropertyIndexes.LARGE_MESSAGE_CONFIRMATIONS, txoper);
+ }
+ txoper.confirmedMessages.add(recordID);
+ }
class TXLargeMessageConfirmationOperation implements TransactionOperation
{