JBoss hornetq SVN: r11166 - branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2011-08-09 09:38:59 -0400 (Tue, 09 Aug 2011)
New Revision: 11166
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
HORNETQ-720 Change from RuntimeException to HornetQException
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-08-09 13:03:23 UTC (rev 11165)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-08-09 13:38:59 UTC (rev 11166)
@@ -603,7 +603,7 @@
* XXX HORNETQ-720 Live is down, and this server was not in sync. Perhaps we should
* first try to wait a little longer to see if the 'live' comes back?
*/
- throw new RuntimeException("Backup Server was not yet in sync with live");
+ throw new HornetQException(HornetQException.ILLEGAL_STATE, "Backup Server was not yet in sync with live");
}
13 years, 5 months
JBoss hornetq SVN: r11165 - branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal.
by do-not-reply@jboss.org
Author: borges
Date: 2011-08-09 09:03:23 -0400 (Tue, 09 Aug 2011)
New Revision: 11165
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
Log:
HORNETQ-720 Add locking to methods that still missed it.
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-08-09 13:02:46 UTC (rev 11164)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-08-09 13:03:23 UTC (rev 11165)
@@ -762,29 +762,29 @@
public void storeMessageTransactional(final long txID, final ServerMessage message) throws Exception
{
- readLock();
- try
- {
if (message.getMessageID() <= 0)
{
throw new HornetQException(HornetQException.ILLEGAL_STATE, "MessageId was not assigned to Message");
}
- if (message.isLargeMessage())
+ readLock();
+ try
{
- messageJournal.appendAddRecordTransactional(txID,
+ if (message.isLargeMessage())
+ {
+ messageJournal.appendAddRecordTransactional(txID,
message.getMessageID(),
JournalStorageManager.ADD_LARGE_MESSAGE,
new LargeMessageEncoding(((LargeServerMessage)message)));
- }
- else
- {
- messageJournal.appendAddRecordTransactional(txID,
+ }
+ else
+ {
+ messageJournal.appendAddRecordTransactional(txID,
message.getMessageID(),
JournalStorageManager.ADD_MESSAGE,
message);
+ }
}
- }
finally
{
readUnLock();
@@ -933,33 +933,74 @@
public void deleteHeuristicCompletion(final long id) throws Exception
{
+ readLock();
+ try
+ {
+
messageJournal.appendDeleteRecord(id, true, getContext(true));
+ }
+ finally
+ {
+ readUnLock();
+ }
}
public void deletePageTransactional(final long recordID) throws Exception
{
+ readLock();
+ try
+ {
messageJournal.appendDeleteRecord(recordID, false);
+ }
+ finally
+ {
+ readUnLock();
+ }
}
public void updateScheduledDeliveryTimeTransactional(final long txID, final MessageReference ref) throws Exception
{
ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(), ref.getQueue()
.getID());
+ readLock();
+ try
+ {
messageJournal.appendUpdateRecordTransactional(txID,
ref.getMessage().getMessageID(),
JournalStorageManager.SET_SCHEDULED_DELIVERY_TIME,
encoding);
+ }
+ finally
+ {
+ readUnLock();
+ }
}
public void deleteMessageTransactional(final long txID, final long queueID, final long messageID) throws Exception
{
+ readLock();
+ try
+ {
messageJournal.appendDeleteRecordTransactional(txID, messageID, new DeleteEncoding(queueID));
+ }
+ finally
+ {
+ readUnLock();
+ }
}
public void prepare(final long txID, final Xid xid) throws Exception
{
+ readLock();
+ try
+ {
messageJournal.appendPrepareRecord(txID, new XidEncoding(xid), syncTransactional, getContext(syncTransactional));
+ }
+ finally
+ {
+ readUnLock();
+ }
}
public void commit(final long txID) throws Exception
@@ -969,17 +1010,33 @@
public void commit(final long txID, final boolean lineUpContext) throws Exception
{
+ readLock();
+ try
+ {
messageJournal.appendCommitRecord(txID, syncTransactional, getContext(syncTransactional), lineUpContext);
if (!lineUpContext && !syncTransactional)
{
// if lineUpContext == false, we have previously lined up a context, hence we need to mark it as done even if syncTransactional = false
getContext(true).done();
+ }
}
+ finally
+ {
+ readUnLock();
+ }
}
public void rollback(final long txID) throws Exception
{
+ readLock();
+ try
+ {
messageJournal.appendRollbackRecord(txID, syncTransactional, getContext(syncTransactional));
+ }
+ finally
+ {
+ readUnLock();
+ }
}
public void storeDuplicateIDTransactional(final long txID,
@@ -989,7 +1046,15 @@
{
DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID);
+ readLock();
+ try
+ {
messageJournal.appendAddRecordTransactional(txID, recordID, JournalStorageManager.DUPLICATE_ID, encoding);
+ }
+ finally
+ {
+ readUnLock();
+ }
}
public void updateDuplicateIDTransactional(final long txID,
@@ -999,12 +1064,28 @@
{
DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID);
+ readLock();
+ try
+ {
messageJournal.appendUpdateRecordTransactional(txID, recordID, JournalStorageManager.DUPLICATE_ID, encoding);
+ }
+ finally
+ {
+ readUnLock();
+ }
}
public void deleteDuplicateIDTransactional(final long txID, final long recordID) throws Exception
{
- messageJournal.appendDeleteRecordTransactional(txID, recordID);
+ readLock();
+ try
+ {
+ messageJournal.appendDeleteRecordTransactional(txID, recordID);
+ }
+ finally
+ {
+ readUnLock();
+ }
}
// Other operations
@@ -1019,7 +1100,10 @@
DeliveryCountUpdateEncoding updateInfo = new DeliveryCountUpdateEncoding(ref.getQueue().getID(),
ref.getDeliveryCount());
- messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
+ readLock();
+ try
+ {
+ messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
JournalStorageManager.UPDATE_DELIVERY_COUNT,
updateInfo,
@@ -1027,15 +1111,28 @@
getContext(syncNonTransactional));
}
+ finally
+ {
+ readUnLock();
+ }
+ }
}
public void storeAddressSetting(PersistedAddressSetting addressSetting) throws Exception
{
deleteAddressSetting(addressSetting.getAddressMatch());
+ readLock();
+ try
+ {
long id = idGenerator.generateID();
addressSetting.setStoreId(id);
bindingsJournal.appendAddRecord(id, ADDRESS_SETTING_RECORD, addressSetting, true);
mapPersistedAddressSettings.put(addressSetting.getAddressMatch(), addressSetting);
+ }
+ finally
+ {
+ readUnLock();
+ }
}
public List<PersistedAddressSetting> recoverAddressSettings() throws Exception
@@ -1062,10 +1159,18 @@
{
deleteSecurityRoles(persistedRoles.getAddressMatch());
- long id = idGenerator.generateID();
+ readLock();
+ try
+ {
+ final long id = idGenerator.generateID();
persistedRoles.setStoreId(id);
bindingsJournal.appendAddRecord(id, SECURITY_RECORD, persistedRoles, true);
mapPersistedRoles.put(persistedRoles.getAddressMatch(), persistedRoles);
+ }
+ finally
+ {
+ readUnLock();
+ }
}
public void deleteAddressSetting(SimpleString addressMatch) throws Exception
@@ -1073,7 +1178,15 @@
PersistedAddressSetting oldSetting = mapPersistedAddressSettings.remove(addressMatch);
if (oldSetting != null)
{
- bindingsJournal.appendDeleteRecord(oldSetting.getStoreId(), false);
+ readLock();
+ try
+ {
+ bindingsJournal.appendDeleteRecord(oldSetting.getStoreId(), false);
+ }
+ finally
+ {
+ readUnLock();
+ }
}
}
@@ -1083,7 +1196,15 @@
PersistedRoles oldRoles = mapPersistedRoles.remove(addressMatch);
if (oldRoles != null)
{
- bindingsJournal.appendDeleteRecord(oldRoles.getStoreId(), false);
+ readLock();
+ try
+ {
+ bindingsJournal.appendDeleteRecord(oldRoles.getStoreId(), false);
+ }
+ finally
+ {
+ readUnLock();
+ }
}
}
@@ -1099,6 +1220,9 @@
List<PreparedTransactionInfo> preparedTransactions = new ArrayList<PreparedTransactionInfo>();
Map<Long, ServerMessage> messages = new HashMap<Long, ServerMessage>();
+ readLock();
+ try
+ {
JournalLoadInformation info = messageJournal.load(records,
preparedTransactions,
@@ -1498,6 +1622,11 @@
}
journalLoaded = true;
return info;
+ }
+ finally
+ {
+ readUnLock();
+ }
}
/**
@@ -1506,7 +1635,7 @@
* @param queueInfos
* @return
*/
- private PageSubscription locateSubscription(final long queueID,
+ private static PageSubscription locateSubscription(final long queueID,
final Map<Long, PageSubscription> pageSubscriptions,
final Map<Long, QueueBindingInfo> queueInfos,
final PagingManager pagingManager) throws Exception
@@ -1538,7 +1667,8 @@
readLock();
try
{
- bindingsJournal.appendAddRecord(groupBinding.getId(), JournalStorageManager.GROUP_RECORD, groupingEncoding, true);
+ bindingsJournal.appendAddRecord(groupBinding.getId(), JournalStorageManager.GROUP_RECORD, groupingEncoding,
+ true);
}
finally
{
@@ -1551,7 +1681,7 @@
readLock();
try
{
- bindingsJournal.appendDeleteRecord(groupBinding.getId(), true);
+ bindingsJournal.appendDeleteRecord(groupBinding.getId(), true);
}
finally
{
@@ -1628,13 +1758,13 @@
readLock();
try
{
- long recordID = idGenerator.generateID();
- messageJournal.appendAddRecord(recordID,
+ final long recordID = idGenerator.generateID();
+ messageJournal.appendAddRecord(recordID,
JournalStorageManager.PAGE_CURSOR_COUNTER_INC,
new PageCountRecordInc(queueID, value),
true,
getContext());
- return recordID;
+ return recordID;
}
finally
{
@@ -1650,7 +1780,7 @@
readLock();
try
{
- long recordID = idGenerator.generateID();
+ final long recordID = idGenerator.generateID();
messageJournal.appendAddRecordTransactional(txID,
recordID,
JournalStorageManager.PAGE_CURSOR_COUNTER_VALUE,
@@ -1671,8 +1801,8 @@
readLock();
try
{
- messageJournal.appendDeleteRecordTransactional(txID, recordID);
- }
+ messageJournal.appendDeleteRecordTransactional(txID, recordID);
+ }
finally
{
readUnLock();
@@ -1687,8 +1817,8 @@
readLock();
try
{
- messageJournal.appendDeleteRecordTransactional(txID, recordID);
- }
+ messageJournal.appendDeleteRecordTransactional(txID, recordID);
+ }
finally
{
readUnLock();
@@ -1785,7 +1915,7 @@
readLock();
try
{
- messageJournal.lineUpContex(getContext());
+ messageJournal.lineUpContex(getContext());
}
finally
{
@@ -1861,9 +1991,9 @@
try
{
JournalLoadInformation[] info = new JournalLoadInformation[2];
- info[0] = bindingsJournal.loadInternalOnly();
- info[1] = messageJournal.loadInternalOnly();
- return info;
+ info[0] = bindingsJournal.loadInternalOnly();
+ info[1] = messageJournal.loadInternalOnly();
+ return info;
}
finally
{
13 years, 5 months
JBoss hornetq SVN: r11164 - branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence.
by do-not-reply@jboss.org
Author: borges
Date: 2011-08-09 09:02:46 -0400 (Tue, 09 Aug 2011)
New Revision: 11164
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java
Log:
HORNETQ-720 Document new interface method
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java 2011-08-09 10:32:37 UTC (rev 11163)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java 2011-08-09 13:02:46 UTC (rev 11164)
@@ -24,6 +24,7 @@
import org.hornetq.core.journal.IOAsyncTask;
import org.hornetq.core.journal.Journal;
import org.hornetq.core.journal.JournalLoadInformation;
+import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.message.impl.MessageInternal;
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagedMessage;
@@ -223,7 +224,8 @@
long storePageCounterInc(long queueID, int add) throws Exception;
/**
- * @return
+ * @return {@code true} if the underlying {@link SequentialFileFactory} has callback support.
+ * @see SequentialFileFactory#isSupportsCallbacks()
*/
boolean hasCallbackSupport();
13 years, 5 months
JBoss hornetq SVN: r11163 - branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal.
by do-not-reply@jboss.org
Author: borges
Date: 2011-08-09 06:32:37 -0400 (Tue, 09 Aug 2011)
New Revision: 11163
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
Log:
HORNETQ-720 More locking control over JournalStorageManager (unfinished)
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-08-09 10:31:56 UTC (rev 11162)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-08-09 10:32:37 UTC (rev 11163)
@@ -600,6 +600,8 @@
}
readLock();
+ try
+ {
// Note that we don't sync, the add reference that comes immediately after will sync if appropriate
if (message.isLargeMessage())
@@ -618,19 +620,29 @@
false,
getContext(false));
}
- readUnLock();
+ }
+ finally
+ {
+ readUnLock();
+ }
}
public void storeReference(final long queueID, final long messageID, final boolean last) throws Exception
{
readLock();
+ try
+ {
messageJournal.appendUpdateRecord(messageID,
JournalStorageManager.ADD_REF,
new RefEncoding(queueID),
last && syncNonTransactional,
getContext(last && syncNonTransactional));
- readUnLock();
+ }
+ finally
+ {
+ readUnLock();
+ }
}
private void readLock()
@@ -646,17 +658,25 @@
public void storeAcknowledge(final long queueID, final long messageID) throws Exception
{
readLock();
+ try
+ {
messageJournal.appendUpdateRecord(messageID,
JournalStorageManager.ACKNOWLEDGE_REF,
new RefEncoding(queueID),
syncNonTransactional,
getContext(syncNonTransactional));
- readUnLock();
+ }
+ finally
+ {
+ readUnLock();
+ }
}
public void storeCursorAcknowledge(long queueID, PagePosition position) throws Exception
{
readLock();
+ try
+ {
long ackID = idGenerator.generateID();
position.setRecordID(ackID);
messageJournal.appendAddRecord(ackID,
@@ -664,52 +684,78 @@
new CursorAckRecordEncoding(queueID, position),
syncNonTransactional,
getContext(syncNonTransactional));
- readUnLock();
+ }
+ finally
+ {
+ readUnLock();
+ }
}
public void deleteMessage(final long messageID) throws Exception
{
readLock();
+ try
+ {
// Messages are deleted on postACK, one after another.
// If these deletes are synchronized, we would build up messages on the Executor
// increasing chances of losing deletes.
// The StorageManager should verify messages without references
messageJournal.appendDeleteRecord(messageID, false, getContext(false));
- readUnLock();
+ }
+ finally
+ {
+ readUnLock();
+ }
}
public void updateScheduledDeliveryTime(final MessageReference ref) throws Exception
{
- readLock();
ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(), ref.getQueue()
.getID());
-
+ readLock();
+ try
+ {
messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
JournalStorageManager.SET_SCHEDULED_DELIVERY_TIME,
encoding,
syncNonTransactional,
getContext(syncNonTransactional));
- readUnLock();
+ }
+ finally
+ {
+ readUnLock();
+ }
}
public void storeDuplicateID(final SimpleString address, final byte[] duplID, final long recordID) throws Exception
{
+ DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID);
readLock();
- DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID);
-
- messageJournal.appendAddRecord(recordID,
+ try
+ {
+ messageJournal.appendAddRecord(recordID,
JournalStorageManager.DUPLICATE_ID,
encoding,
syncNonTransactional,
getContext(syncNonTransactional));
- readUnLock();
+ }
+ finally
+ {
+ readUnLock();
+ }
}
public void deleteDuplicateID(final long recordID) throws Exception
{
readLock();
+ try
+ {
messageJournal.appendDeleteRecord(recordID, syncNonTransactional, getContext(syncNonTransactional));
- readUnLock();
+ }
+ finally
+ {
+ readUnLock();
+ }
}
// Transactional operations
@@ -717,6 +763,8 @@
public void storeMessageTransactional(final long txID, final ServerMessage message) throws Exception
{
readLock();
+ try
+ {
if (message.getMessageID() <= 0)
{
throw new HornetQException(HornetQException.ILLEGAL_STATE, "MessageId was not assigned to Message");
@@ -736,61 +784,95 @@
JournalStorageManager.ADD_MESSAGE,
message);
}
- readUnLock();
+ }
+ finally
+ {
+ readUnLock();
+ }
}
public void storePageTransaction(final long txID, final PageTransactionInfo pageTransaction) throws Exception
{
readLock();
+ try
+ {
pageTransaction.setRecordID(generateUniqueID());
messageJournal.appendAddRecordTransactional(txID,
pageTransaction.getRecordID(),
JournalStorageManager.PAGE_TRANSACTION,
pageTransaction);
- readUnLock();
+ }
+ finally
+ {
+ readUnLock();
+ }
}
public void updatePageTransaction(final long txID, final PageTransactionInfo pageTransaction, final int depages) throws Exception
{
readLock();
+ try
+ {
messageJournal.appendUpdateRecordTransactional(txID,
pageTransaction.getRecordID(),
JournalStorageManager.PAGE_TRANSACTION,
new PageUpdateTXEncoding(pageTransaction.getTransactionID(),
depages));
- readUnLock();
+ }
+ finally
+ {
+ readUnLock();
+ }
}
public void updatePageTransaction(final PageTransactionInfo pageTransaction, final int depages) throws Exception
{
readLock();
+ try
+ {
messageJournal.appendUpdateRecord(pageTransaction.getRecordID(),
JournalStorageManager.PAGE_TRANSACTION,
new PageUpdateTXEncoding(pageTransaction.getTransactionID(), depages),
syncNonTransactional,
getContext(syncNonTransactional));
- readUnLock();
+ }
+ finally
+ {
+ readUnLock();
+ }
}
public void storeReferenceTransactional(final long txID, final long queueID, final long messageID) throws Exception
{
readLock();
+ try
+ {
messageJournal.appendUpdateRecordTransactional(txID,
messageID,
JournalStorageManager.ADD_REF,
new RefEncoding(queueID));
- readUnLock();
+ }
+ finally
+ {
+ readUnLock();
+ }
}
public void storeAcknowledgeTransactional(final long txID, final long queueID, final long messageID) throws Exception
{
readLock();
+ try
+ {
messageJournal.appendUpdateRecordTransactional(txID,
messageID,
JournalStorageManager.ACKNOWLEDGE_REF,
new RefEncoding(queueID));
- readUnLock();
+ }
+ finally
+ {
+ readUnLock();
+ }
}
/* (non-Javadoc)
@@ -798,12 +880,20 @@
*/
public void storeCursorAcknowledgeTransactional(long txID, long queueID, PagePosition position) throws Exception
{
- long ackID = idGenerator.generateID();
- position.setRecordID(ackID);
- messageJournal.appendAddRecordTransactional(txID,
+ readLock();
+ try
+ {
+ long ackID = idGenerator.generateID();
+ position.setRecordID(ackID);
+ messageJournal.appendAddRecordTransactional(txID,
ackID,
ACKNOWLEDGE_CURSOR,
new CursorAckRecordEncoding(queueID, position));
+ }
+ finally
+ {
+ readUnLock();
+ }
}
/* (non-Javadoc)
@@ -811,19 +901,34 @@
*/
public void deleteCursorAcknowledgeTransactional(long txID, long ackID) throws Exception
{
- messageJournal.appendDeleteRecordTransactional(txID, ackID);
+ readLock();
+ try
+ {
+ messageJournal.appendDeleteRecordTransactional(txID, ackID);
+ }
+ finally
+ {
+ readUnLock();
+ }
}
public long storeHeuristicCompletion(final Xid xid, final boolean isCommit) throws Exception
{
- long id = generateUniqueID();
-
- messageJournal.appendAddRecord(id,
+ readLock();
+ try
+ {
+ long id = generateUniqueID();
+ messageJournal.appendAddRecord(id,
JournalStorageManager.HEURISTIC_COMPLETION,
new HeuristicCompletionEncoding(xid, isCommit),
true,
getContext(true));
- return id;
+ return id;
+ }
+ finally
+ {
+ readUnLock();
+ }
}
public void deleteHeuristicCompletion(final long id) throws Exception
@@ -1430,12 +1535,28 @@
GroupingEncoding groupingEncoding = new GroupingEncoding(groupBinding.getId(),
groupBinding.getGroupId(),
groupBinding.getClusterName());
+ readLock();
+ try
+ {
bindingsJournal.appendAddRecord(groupBinding.getId(), JournalStorageManager.GROUP_RECORD, groupingEncoding, true);
+ }
+ finally
+ {
+ readUnLock();
+ }
}
public void deleteGrouping(final GroupBinding groupBinding) throws Exception
{
+ readLock();
+ try
+ {
bindingsJournal.appendDeleteRecord(groupBinding.getId(), true);
+ }
+ finally
+ {
+ readUnLock();
+ }
}
// Bindings operations
@@ -1451,16 +1572,31 @@
PersistentQueueBindingEncoding bindingEncoding = new PersistentQueueBindingEncoding(queue.getName(),
binding.getAddress(),
filterString);
-
+ readLock();
+ try
+ {
bindingsJournal.appendAddRecord(binding.getID(),
JournalStorageManager.QUEUE_BINDING_RECORD,
bindingEncoding,
true);
+ }
+ finally
+ {
+ readUnLock();
+ }
}
public void deleteQueueBinding(final long queueBindingID) throws Exception
{
+ readLock();
+ try
+ {
bindingsJournal.appendDeleteRecord(queueBindingID, true);
+ }
+ finally
+ {
+ readUnLock();
+ }
}
/* (non-Javadoc)
@@ -1468,12 +1604,20 @@
*/
public long storePageCounterInc(long txID, long queueID, int value) throws Exception
{
+ readLock();
+ try
+ {
long recordID = idGenerator.generateID();
messageJournal.appendAddRecordTransactional(txID,
recordID,
JournalStorageManager.PAGE_CURSOR_COUNTER_INC,
new PageCountRecordInc(queueID, value));
- return recordID;
+ return recordID;
+ }
+ finally
+ {
+ readUnLock();
+ }
}
/* (non-Javadoc)
@@ -1481,6 +1625,9 @@
*/
public long storePageCounterInc(long queueID, int value) throws Exception
{
+ readLock();
+ try
+ {
long recordID = idGenerator.generateID();
messageJournal.appendAddRecord(recordID,
JournalStorageManager.PAGE_CURSOR_COUNTER_INC,
@@ -1488,6 +1635,11 @@
true,
getContext());
return recordID;
+ }
+ finally
+ {
+ readUnLock();
+ }
}
/* (non-Javadoc)
@@ -1495,6 +1647,9 @@
*/
public long storePageCounter(long txID, long queueID, long value) throws Exception
{
+ readLock();
+ try
+ {
long recordID = idGenerator.generateID();
messageJournal.appendAddRecordTransactional(txID,
recordID,
@@ -1502,22 +1657,43 @@
new PageCountRecord(queueID, value));
return recordID;
}
+ finally
+ {
+ readUnLock();
+ }
+ }
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#deleteIncrementRecord(long, long)
*/
public void deleteIncrementRecord(long txID, long recordID) throws Exception
{
+ readLock();
+ try
+ {
messageJournal.appendDeleteRecordTransactional(txID, recordID);
}
+ finally
+ {
+ readUnLock();
+ }
+ }
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#deletePageCounter(long, long)
*/
public void deletePageCounter(long txID, long recordID) throws Exception
{
+ readLock();
+ try
+ {
messageJournal.appendDeleteRecordTransactional(txID, recordID);
}
+ finally
+ {
+ readUnLock();
+ }
+ }
public static void describeBindingJournal(final String bindingsDir) throws Exception
{
@@ -1606,7 +1782,15 @@
*/
public void lineUpContext()
{
+ readLock();
+ try
+ {
messageJournal.lineUpContex(getContext());
+ }
+ finally
+ {
+ readUnLock();
+ }
}
@@ -1673,10 +1857,18 @@
*/
public JournalLoadInformation[] loadInternalOnly() throws Exception
{
- JournalLoadInformation[] info = new JournalLoadInformation[2];
+ readLock();
+ try
+ {
+ JournalLoadInformation[] info = new JournalLoadInformation[2];
info[0] = bindingsJournal.loadInternalOnly();
info[1] = messageJournal.loadInternalOnly();
return info;
+ }
+ finally
+ {
+ readUnLock();
+ }
}
// Public -----------------------------------------------------------------------------------
13 years, 5 months
JBoss hornetq SVN: r11162 - branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2011-08-09 06:31:56 -0400 (Tue, 09 Aug 2011)
New Revision: 11162
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
Log:
HORNETQ-720 Initialize contents of "current" journal file used in replicating backup server.
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java 2011-08-09 10:31:20 UTC (rev 11161)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalFilesRepository.java 2011-08-09 10:31:56 UTC (rev 11162)
@@ -443,9 +443,9 @@
return nextFile;
}
- public JournalFile createRemoteBackupSyncFile(long fileID) throws Exception
+ public JournalFile createRemoteBackupSyncFile(long fileID, boolean init) throws Exception
{
- return createFile(false, false, false, false, fileID);
+ return createFile(false, false, init, false, fileID);
}
// Package protected ---------------------------------------------
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-08-09 10:31:20 UTC (rev 11161)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-08-09 10:31:56 UTC (rev 11162)
@@ -3128,11 +3128,11 @@
for (long id : fileIds)
{
maxID = Math.max(maxID, id);
- map.put(Long.valueOf(id), filesRepository.createRemoteBackupSyncFile(id));
+ map.put(Long.valueOf(id), filesRepository.createRemoteBackupSyncFile(id, false));
}
maxID += 1;
filesRepository.setNextFileID(maxID);
- return filesRepository.createRemoteBackupSyncFile(maxID);
+ return filesRepository.createRemoteBackupSyncFile(maxID, true);
}
finally
{
13 years, 5 months
JBoss hornetq SVN: r11161 - in branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core: persistence/impl/nullpm and 1 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-08-09 06:31:20 -0400 (Tue, 09 Aug 2011)
New Revision: 11161
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
Log:
HORNETQ-720 Avoid having two instances of JournalStorageManager in a replicated backup server.
Also added missing methods to StorageManager to avoid using JournalStorageManager.
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java 2011-08-09 10:30:29 UTC (rev 11160)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java 2011-08-09 10:31:20 UTC (rev 11161)
@@ -22,6 +22,7 @@
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.journal.IOAsyncTask;
+import org.hornetq.core.journal.Journal;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.message.impl.MessageInternal;
import org.hornetq.core.paging.PageTransactionInfo;
@@ -41,9 +42,9 @@
import org.hornetq.core.transaction.ResourceManager;
/**
- *
+ *
* A StorageManager
- *
+ *
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
* @author <a href="mailto:andy.taylor@jboss.org>Andy Taylor</a>
@@ -54,12 +55,12 @@
/** Get the context associated with the thread for later reuse */
OperationContext getContext();
-
+
void lineUpContext();
/** It just creates an OperationContext without associating it */
OperationContext newContext(Executor executor);
-
+
OperationContext newSingleThreadContext();
/** Set the context back to the thread */
@@ -77,20 +78,20 @@
void afterCompleteOperations(IOAsyncTask run);
- /** Block until the operations are done.
+ /** Block until the operations are done.
* Warning: Don't use it inside an ordered executor, otherwise the system may lock up
* in case of the pools are full
* @throws Exception */
boolean waitOnOperations(long timeout) throws Exception;
- /** Block until the operations are done.
+ /** Block until the operations are done.
* Warning: Don't use it inside an ordered executor, otherwise the system may lock up
* in case of the pools are full
* @throws Exception */
void waitOnOperations() throws Exception;
void clearContext();
-
+
long generateUniqueID();
long getCurrentUniqueID();
@@ -102,7 +103,7 @@
void deleteMessage(long messageID) throws Exception;
void storeAcknowledge(long queueID, long messageID) throws Exception;
-
+
void storeCursorAcknowledge(long queueID, PagePosition position) throws Exception;
void updateDeliveryCount(MessageReference ref) throws Exception;
@@ -120,7 +121,7 @@
void storeAcknowledgeTransactional(long txID, long queueID, long messageID) throws Exception;
void storeCursorAcknowledgeTransactional(long txID, long queueID, PagePosition position) throws Exception;
-
+
void deleteCursorAcknowledgeTransactional(long txID, long ackID) throws Exception;
void updateScheduledDeliveryTimeTransactional(long txID, MessageReference ref) throws Exception;
@@ -136,9 +137,9 @@
LargeServerMessage createLargeMessage();
/**
- *
+ *
* @param id
- * @param message This is a temporary message that holds the parsed properties.
+ * @param message This is a temporary message that holds the parsed properties.
* The remoting layer can't create a ServerMessage directly, then this will be replaced.
* @return
*/
@@ -153,9 +154,9 @@
void rollback(long txID) throws Exception;
void storePageTransaction(long txID, PageTransactionInfo pageTransaction) throws Exception;
-
+
void updatePageTransaction(long txID, PageTransactionInfo pageTransaction, int depage) throws Exception;
-
+
void updatePageTransaction(PageTransactionInfo pageTransaction, int depage) throws Exception;
void deletePageTransactional(long recordID) throws Exception;
@@ -187,39 +188,53 @@
void addGrouping(GroupBinding groupBinding) throws Exception;
void deleteGrouping(GroupBinding groupBinding) throws Exception;
-
+
void storeAddressSetting(PersistedAddressSetting addressSetting) throws Exception;
-
+
void deleteAddressSetting(SimpleString addressMatch) throws Exception;
-
+
List<PersistedAddressSetting> recoverAddressSettings() throws Exception;
-
+
void storeSecurityRoles(PersistedRoles persistedRoles) throws Exception;
-
+
void deleteSecurityRoles(SimpleString addressMatch) throws Exception;
List<PersistedRoles> recoverPersistedRoles() throws Exception;
-
- /**
+
+ /**
* @return The ID with the stored counter
*/
long storePageCounter(long txID, long queueID, long value) throws Exception;
-
+
void deleteIncrementRecord(long txID, long recordID) throws Exception;
-
+
void deletePageCounter(long txID, long recordID) throws Exception;
/**
* @return the ID with the increment record
- * @throws Exception
+ * @throws Exception
*/
long storePageCounterInc(long txID, long queueID, int add) throws Exception;
-
+
/**
* @return the ID with the increment record
- * @throws Exception
+ * @throws Exception
*/
long storePageCounterInc(long queueID, int add) throws Exception;
-
-
+
+ /**
+ * @return
+ */
+ boolean hasCallbackSupport();
+
+ /**
+ * @return the bindings journal
+ */
+ Journal getBindingsJournal();
+
+ /**
+ * @return the message journal
+ */
+ Journal getMessageJournal();
+
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2011-08-09 10:30:29 UTC (rev 11160)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2011-08-09 10:31:20 UTC (rev 11161)
@@ -24,6 +24,7 @@
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.journal.IOAsyncTask;
+import org.hornetq.core.journal.Journal;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.message.impl.MessageInternal;
import org.hornetq.core.paging.PageTransactionInfo;
@@ -47,9 +48,9 @@
import org.hornetq.core.transaction.ResourceManager;
/**
- *
+ *
* A NullStorageManager
- *
+ *
* @author <a href="mailto:ataylor@redhat.com">Andy Taylor</a>
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
*
@@ -59,47 +60,47 @@
private final AtomicLong idSequence = new AtomicLong(0);
private volatile boolean started;
-
+
private static final OperationContext dummyContext = new OperationContext()
{
-
+
public void onError(int errorCode, String errorMessage)
{
}
-
+
public void done()
{
}
-
+
public void storeLineUp()
{
}
-
+
public boolean waitCompletion(long timeout) throws Exception
{
return true;
}
-
+
public void waitCompletion() throws Exception
{
}
-
+
public void replicationLineUp()
{
}
-
+
public void replicationDone()
{
}
-
+
public void pageSyncLineUp()
{
}
-
+
public void pageSyncDone()
{
}
-
+
public void executeOnCompletion(IOAsyncTask runnable)
{
runnable.done();
@@ -235,7 +236,7 @@
public LargeServerMessage createLargeMessage(final long id, final MessageInternal message)
{
NullStorageLargeServerMessage largeMessage = new NullStorageLargeServerMessage();
-
+
largeMessage.copyHeadersAndProperties(message);
largeMessage.setMessageID(id);
@@ -407,8 +408,8 @@
{
return dummyContext;
}
-
-
+
+
public OperationContext newSingleThreadContext()
{
return dummyContext;
@@ -566,7 +567,25 @@
public void lineUpContext()
{
// TODO Auto-generated method stub
-
+
}
+ @Override
+ public boolean hasCallbackSupport()
+ {
+ return false;
+ }
+
+ @Override
+ public Journal getBindingsJournal()
+ {
+ return null;
+ }
+
+ @Override
+ public Journal getMessageJournal()
+ {
+ return null;
+ }
+
}
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-08-09 10:30:29 UTC (rev 11160)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-08-09 10:31:20 UTC (rev 11161)
@@ -35,7 +35,7 @@
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.impl.PagingManagerImpl;
import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
-import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
import org.hornetq.core.protocol.core.Channel;
import org.hornetq.core.protocol.core.Packet;
@@ -90,7 +90,7 @@
/** Used to hold the real Journals before the backup is synchronized. */
private final Map<JournalContent, Journal> journalsHolder = new HashMap<JournalContent, Journal>();
- private JournalStorageManager storage;
+ private StorageManager storage;
private PagingManager pageManager;
@@ -234,7 +234,7 @@
{
Configuration config = server.getConfiguration();
- storage = new JournalStorageManager(config, server.getExecutorFactory());
+ storage = server.getStorageManager();
storage.start();
server.getManagementService().setStorageManager(storage);
13 years, 5 months
JBoss hornetq SVN: r11160 - branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2011-08-09 06:30:29 -0400 (Tue, 09 Aug 2011)
New Revision: 11160
Modified:
branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
Remove FakeStorageManager as it is the same thing as the NullStorageManager.
Modified: branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2011-08-09 10:29:32 UTC (rev 11159)
+++ branches/HORNETQ-720_Replication/tests/unit-tests/src/test/java/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2011-08-09 10:30:29 UTC (rev 11160)
@@ -15,7 +15,6 @@
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -27,59 +26,39 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import javax.transaction.xa.Xid;
-
import junit.framework.Assert;
import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.api.core.HornetQBuffers;
-import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
-import org.hornetq.core.journal.IOAsyncTask;
-import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
-import org.hornetq.core.message.impl.MessageInternal;
import org.hornetq.core.paging.Page;
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.paging.PagingStoreFactory;
-import org.hornetq.core.paging.cursor.PagePosition;
import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
import org.hornetq.core.paging.impl.PagingStoreImpl;
import org.hornetq.core.paging.impl.TestSupportPageStore;
-import org.hornetq.core.persistence.GroupingInfo;
-import org.hornetq.core.persistence.OperationContext;
-import org.hornetq.core.persistence.QueueBindingInfo;
import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.persistence.config.PersistedAddressSetting;
-import org.hornetq.core.persistence.config.PersistedRoles;
import org.hornetq.core.persistence.impl.nullpm.NullStorageManager;
-import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.PostOffice;
-import org.hornetq.core.replication.ReplicationManager;
-import org.hornetq.core.server.LargeServerMessage;
-import org.hornetq.core.server.MessageReference;
-import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.server.group.impl.GroupBinding;
import org.hornetq.core.server.impl.RoutingContextImpl;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressFullMessagePolicy;
import org.hornetq.core.settings.impl.AddressSettings;
-import org.hornetq.core.transaction.ResourceManager;
import org.hornetq.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
-import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.UnitTestCase;
import org.hornetq.utils.ExecutorFactory;
/**
- *
+ *
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
*
*/
@@ -704,7 +683,7 @@
public void testRestartPage() throws Throwable
{
clearData();
- SequentialFileFactory factory = new NIOSequentialFileFactory(this.getPageDir());
+ SequentialFileFactory factory = new NIOSequentialFileFactory(UnitTestCase.getPageDir());
PagingStoreFactory storeFactory = new FakeStoreFactory(factory);
@@ -740,14 +719,14 @@
storeImpl.startPaging();
assertNotNull(storeImpl.getCurrentPage());
-
+
storeImpl.stop();
}
public void testOrderOnPaging() throws Throwable
{
clearData();
- SequentialFileFactory factory = new NIOSequentialFileFactory(this.getPageDir());
+ SequentialFileFactory factory = new NIOSequentialFileFactory(UnitTestCase.getPageDir());
PagingStoreFactory storeFactory = new FakeStoreFactory(factory);
@@ -903,14 +882,9 @@
private StorageManager createStorageManagerMock()
{
- return new FakeStorageManager();
+ return new NullStorageManager();
}
- private PostOffice createPostOfficeMock()
- {
- return new FakePostOffice();
- }
-
private ExecutorFactory getExecutorFactory()
{
return new ExecutorFactory()
@@ -1113,575 +1087,7 @@
}
- class FakeStorageManager implements StorageManager
- {
- public void setUniqueIDSequence(final long id)
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#addQueueBinding(org.hornetq.core.postoffice.Binding)
- */
- public void addQueueBinding(final Binding binding) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#commit(long)
- */
- public void commit(final long txID) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#createLargeMessage()
- */
- public LargeServerMessage createLargeMessage()
- {
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#deleteDuplicateID(long)
- */
- public void deleteDuplicateID(final long recordID) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#deleteDuplicateIDTransactional(long, long)
- */
- public void deleteDuplicateIDTransactional(final long txID, final long recordID) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#deleteMessage(long)
- */
- public void deleteMessage(final long messageID) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#deleteMessageTransactional(long, long, long)
- */
- public void deleteMessageTransactional(final long txID, final long queueID, final long messageID) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#deletePageTransactional(long, long)
- */
- public void deletePageTransactional(final long txID, final long recordID) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#deleteQueueBinding(long)
- */
- public void deleteQueueBinding(final long queueBindingID) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#generateUniqueID()
- */
- public long generateUniqueID()
- {
- return 0;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#getCurrentUniqueID()
- */
- public long getCurrentUniqueID()
- {
- return 0;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#loadBindingJournal(java.util.List)
- */
- public JournalLoadInformation loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos,
- final List<GroupingInfo> groupingInfos) throws Exception
- {
- return new JournalLoadInformation();
- }
-
- public void addGrouping(final GroupBinding groupBinding) throws Exception
- {
- // To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void deleteGrouping(final GroupBinding groupBinding) throws Exception
- {
- // To change body of implemented methods use File | Settings | File Templates.
- }
-
- public void sync()
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#loadMessageJournal(org.hornetq.core.paging.PagingManager, java.util.Map, org.hornetq.core.transaction.ResourceManager, java.util.Map)
- */
- public JournalLoadInformation loadMessageJournal(final PostOffice postOffice,
- final PagingManager pagingManager,
- final ResourceManager resourceManager,
- final Map<Long, Queue> queues,
- Map<Long, QueueBindingInfo> queueInfos,
- final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
- {
- return new JournalLoadInformation();
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#prepare(long, javax.transaction.xa.Xid)
- */
- public void prepare(final long txID, final Xid xid) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#rollback(long)
- */
- public void rollback(final long txID) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#storeAcknowledge(long, long)
- */
- public void storeAcknowledge(final long queueID, final long messageID) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#storeAcknowledgeTransactional(long, long, long)
- */
- public void storeAcknowledgeTransactional(final long txID, final long queueID, final long messageID) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#storeDuplicateID(org.hornetq.utils.SimpleString, byte[], long)
- */
- public void storeDuplicateID(final SimpleString address, final byte[] duplID, final long recordID) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#storeDuplicateIDTransactional(long, org.hornetq.utils.SimpleString, byte[], long)
- */
- public void storeDuplicateIDTransactional(final long txID,
- final SimpleString address,
- final byte[] duplID,
- final long recordID) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#storeMessage(org.hornetq.core.server.ServerMessage)
- */
- public void storeMessage(final ServerMessage message) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#storeMessageTransactional(long, org.hornetq.core.server.ServerMessage)
- */
- public void storeMessageTransactional(final long txID, final ServerMessage message) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#storePageTransaction(long, org.hornetq.core.paging.PageTransactionInfo)
- */
- public void storePageTransaction(final long txID, final PageTransactionInfo pageTransaction) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#storeReference(long, long)
- */
- public void storeReference(final long queueID, final long messageID) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#storeReferenceTransactional(long, long, long)
- */
- public void storeReferenceTransactional(final long txID, final long queueID, final long messageID) throws Exception
- {
- }
-
- public long storeHeuristicCompletion(final Xid xid, final boolean isCommit) throws Exception
- {
- return -1;
- }
-
- public void deleteHeuristicCompletion(final long txID) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#updateDeliveryCount(org.hornetq.core.server.MessageReference)
- */
- public void updateDeliveryCount(final MessageReference ref) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#updateDuplicateID(org.hornetq.utils.SimpleString, byte[], long)
- */
- public void updateDuplicateID(final SimpleString address, final byte[] duplID, final long recordID) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#updateDuplicateIDTransactional(long, org.hornetq.utils.SimpleString, byte[], long)
- */
- public void updateDuplicateIDTransactional(final long txID,
- final SimpleString address,
- final byte[] duplID,
- final long recordID) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#updateScheduledDeliveryTime(org.hornetq.core.server.MessageReference)
- */
- public void updateScheduledDeliveryTime(final MessageReference ref) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#updateScheduledDeliveryTimeTransactional(long, org.hornetq.core.server.MessageReference)
- */
- public void updateScheduledDeliveryTimeTransactional(final long txID, final MessageReference ref) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQComponent#isStarted()
- */
- public boolean isStarted()
- {
- return false;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQComponent#start()
- */
- public void start() throws Exception
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQComponent#stop()
- */
- public void stop() throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#afterReplicated(java.lang.Runnable)
- */
- public void afterCompleteOperations(final Runnable run)
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#completeReplication()
- */
- public void completeOperations()
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#createLargeMessage(byte[])
- */
- public LargeServerMessage createLargeMessage(final long messageId, final MessageInternal msg)
- {
-
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#isReplicated()
- */
- public boolean isReplicated()
- {
-
- return false;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#loadInternalOnly()
- */
- public JournalLoadInformation[] loadInternalOnly() throws Exception
- {
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#pageClosed(org.hornetq.utils.SimpleString, int)
- */
- public void pageClosed(final SimpleString storeName, final int pageNumber)
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#pageDeleted(org.hornetq.utils.SimpleString, int)
- */
- public void pageDeleted(final SimpleString storeName, final int pageNumber)
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#pageWrite(org.hornetq.core.paging.PagedMessage, int)
- */
- public void pageWrite(final PagedMessage message, final int pageNumber)
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#blockOnReplication(long)
- */
- public boolean waitOnOperations(final long timeout) throws Exception
- {
- return true;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#setReplicator(org.hornetq.core.replication.ReplicationManager)
- */
- public void setReplicator(final ReplicationManager replicator)
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#afterCompleteOperations(org.hornetq.core.journal.IOCompletion)
- */
- public void afterCompleteOperations(final IOAsyncTask run)
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#waitOnOperations()
- */
- public void waitOnOperations() throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#getContext()
- */
- public OperationContext getContext()
- {
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#newContext(java.util.concurrent.Executor)
- */
- public OperationContext newContext(final Executor executor)
- {
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#clearContext()
- */
- public void clearContext()
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#setContext(org.hornetq.core.persistence.OperationContext)
- */
- public void setContext(final OperationContext context)
- {
- }
-
- public void storeReference(final long queueID, final long messageID, final boolean last) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#recoverAddressSettings()
- */
- public List<PersistedAddressSetting> recoverAddressSettings() throws Exception
- {
- return Collections.emptyList();
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#recoverPersistedRoles()
- */
- public List<PersistedRoles> recoverPersistedRoles() throws Exception
- {
- return Collections.emptyList();
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#storeAddressSetting(org.hornetq.core.persistconfig.PersistedAddressSetting)
- */
- public void storeAddressSetting(PersistedAddressSetting addressSetting) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#storeSecurityRoles(org.hornetq.core.persistconfig.PersistedRoles)
- */
- public void storeSecurityRoles(PersistedRoles persistedRoles) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#deleteAddressSetting(org.hornetq.api.core.SimpleString)
- */
- public void deleteAddressSetting(SimpleString addressMatch) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#deleteSecurityRoles(org.hornetq.api.core.SimpleString)
- */
- public void deleteSecurityRoles(SimpleString addressMatch) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#deletePageTransactional(long)
- */
- public void deletePageTransactional(long recordID) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#updatePageTransaction(long, org.hornetq.core.paging.PageTransactionInfo, int)
- */
- public void updatePageTransaction(long txID, PageTransactionInfo pageTransaction, int depage) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#storeCursorAcknowledge(long, org.hornetq.core.paging.cursor.PagePosition)
- */
- public void storeCursorAcknowledge(long queueID, PagePosition position)
- {
- // TODO Auto-generated method stub
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#storeCursorAcknowledgeTransactional(long, long, org.hornetq.core.paging.cursor.PagePosition)
- */
- public void storeCursorAcknowledgeTransactional(long txID, long queueID, PagePosition position)
- {
- // TODO Auto-generated method stub
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#deleteCursorAcknowledgeTransactional(long, long)
- */
- public void deleteCursorAcknowledgeTransactional(long txID, long ackID) throws Exception
- {
- // TODO Auto-generated method stub
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#updatePageTransaction(org.hornetq.core.paging.PageTransactionInfo, int)
- */
- public void updatePageTransaction(PageTransactionInfo pageTransaction, int depage) throws Exception
- {
- // TODO Auto-generated method stub
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#storePageCounter(long, long, long)
- */
- public long storePageCounter(long txID, long queueID, long value) throws Exception
- {
- // TODO Auto-generated method stub
- return 0;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#deleteIncrementRecord(long, long)
- */
- public void deleteIncrementRecord(long txID, long recordID) throws Exception
- {
- // TODO Auto-generated method stub
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#deletePageCounter(long, long)
- */
- public void deletePageCounter(long txID, long recordID) throws Exception
- {
- // TODO Auto-generated method stub
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#storePageCounterInc(long, long, int)
- */
- public long storePageCounterInc(long txID, long queueID, int add) throws Exception
- {
- // TODO Auto-generated method stub
- return 0;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#storePageCounterInc(long, int)
- */
- public long storePageCounterInc(long queueID, int add) throws Exception
- {
- // TODO Auto-generated method stub
- return 0;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#newSingleThreadContext()
- */
- public OperationContext newSingleThreadContext()
- {
- return getContext();
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#commit(long, boolean)
- */
- public void commit(long txID, boolean lineUpContext) throws Exception
- {
- // TODO Auto-generated method stub
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#lineUpContext()
- */
- public void lineUpContext()
- {
- // TODO Auto-generated method stub
-
- }
-
- }
-
class FakeStoreFactory implements PagingStoreFactory
{
13 years, 5 months
JBoss hornetq SVN: r11159 - in branches/HORNETQ-720_Replication: hornetq-journal/src/main/java/org/hornetq/core/journal and 3 other directories.
by do-not-reply@jboss.org
Author: borges
Date: 2011-08-09 06:29:32 -0400 (Tue, 09 Aug 2011)
New Revision: 11159
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
HORNETQ-720 Reload after sync is done.
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java 2011-08-09 10:27:51 UTC (rev 11158)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicatedJournal.java 2011-08-09 10:29:32 UTC (rev 11159)
@@ -570,6 +570,11 @@
localJournal.lineUpContex(callback);
}
+ @Override
+ public JournalLoadInformation loadSyncOnly() throws Exception
+ {
+ return localJournal.loadSyncOnly();
+ }
// Package protected ---------------------------------------------
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-08-09 10:27:51 UTC (rev 11158)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-08-09 10:29:32 UTC (rev 11159)
@@ -81,7 +81,7 @@
private Channel channel;
private Journal[] journals;
- private JournalLoadInformation[] journalLoadInformation;
+ private final JournalLoadInformation[] journalLoadInformation = new JournalLoadInformation[2];
/** Files reserved in each journal for synchronization of existing data from the 'live' server. */
private final Map<JournalContent, Map<Long, JournalFile>> filesReservedForSync =
@@ -244,13 +244,11 @@
for (JournalContent jc : EnumSet.allOf(JournalContent.class))
{
- System.out.println("State? " + journalsHolder.get(jc));
filesReservedForSync.put(jc, new HashMap<Long, JournalFile>());
+ // We only need to load internal structures on the backup...
+ journalLoadInformation[jc.typeByte] = journalsHolder.get(jc).loadSyncOnly();
}
- // We only need to load internal structures on the backup...
- journalLoadInformation = storage.loadInternalOnly();
-
pageManager = new PagingManagerImpl(new PagingStoreFactoryNIO(config.getPagingDirectory(),
config.getJournalBufferSize_NIO(),
server.getScheduledPool(),
@@ -395,7 +393,7 @@
{
for (JournalContent jc : EnumSet.allOf(JournalContent.class))
{
- JournalImpl journal = (JournalImpl)journalsHolder.get(jc);
+ JournalImpl journal = (JournalImpl)journalsHolder.remove(jc);
journal.writeLock();
try
{
@@ -404,10 +402,12 @@
throw new IllegalStateException("Journal should not have any data files at this point");
}
// files should be already in place.
- filesReservedForSync.remove(msg.getJournalContent());
- registerJournal(jc.typeByte, journalsHolder.get(jc));
+ filesReservedForSync.remove(jc);
+ registerJournal(jc.typeByte, journal);
+ journal.loadInternalOnly();
// XXX HORNETQ-720 must reload journals
// XXX HORNETQ-720 must start using real journals
+
}
finally
{
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java 2011-08-09 10:27:51 UTC (rev 11158)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/Journal.java 2011-08-09 10:29:32 UTC (rev 11159)
@@ -111,11 +111,18 @@
JournalLoadInformation load(LoaderCallback reloadManager) throws Exception;
- /** Load internal data structures and not expose any data.
- * This is only useful if you're using the journal but not interested on the current data.
- * Useful in situations where the journal is being replicated, copied... etc. */
+ /**
+ * Load internal data structures and not expose any data. This is only useful if you're using the
+ * journal but not interested on the current data. Useful in situations where the journal is
+ * being replicated, copied... etc.
+ */
JournalLoadInformation loadInternalOnly() throws Exception;
+ /**
+ * Load internal data structures, and remain waiting for synchronization to complete.
+ */
+ JournalLoadInformation loadSyncOnly() throws Exception;
+
void lineUpContex(IOCompletion callback);
JournalLoadInformation load(List<RecordInfo> committedRecords,
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-08-09 10:27:51 UTC (rev 11158)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-08-09 10:29:32 UTC (rev 11159)
@@ -1298,35 +1298,39 @@
return fileFactory.getAlignment();
}
- public synchronized JournalLoadInformation loadInternalOnly() throws Exception
+ private static class DummyLoader implements LoaderCallback
{
- LoaderCallback dummyLoader = new LoaderCallback()
+ static final LoaderCallback INSTANCE = new DummyLoader();
+ public void failedTransaction(final long transactionID, final List<RecordInfo> records,
+ final List<RecordInfo> recordsToDelete)
{
+ }
- public void failedTransaction(final long transactionID,
- final List<RecordInfo> records,
- final List<RecordInfo> recordsToDelete)
- {
- }
+ public void updateRecord(final RecordInfo info)
+ {
+ }
- public void updateRecord(final RecordInfo info)
- {
- }
+ public void deleteRecord(final long id)
+ {
+ }
- public void deleteRecord(final long id)
- {
- }
+ public void addRecord(final RecordInfo info)
+ {
+ }
- public void addRecord(final RecordInfo info)
- {
- }
+ public void addPreparedTransaction(final PreparedTransactionInfo preparedTransaction)
+ {
+ }
+ }
- public void addPreparedTransaction(final PreparedTransactionInfo preparedTransaction)
- {
- }
- };
+ public synchronized JournalLoadInformation loadInternalOnly() throws Exception
+ {
+ return load(DummyLoader.INSTANCE, true, false);
+ }
- return this.load(dummyLoader, true, true);
+ public synchronized JournalLoadInformation loadSyncOnly() throws Exception
+ {
+ return load(DummyLoader.INSTANCE, true, true);
}
public JournalLoadInformation load(final List<RecordInfo> committedRecords,
@@ -1746,12 +1750,16 @@
private synchronized JournalLoadInformation load(final LoaderCallback loadManager, boolean fixFailingTransactions,
final boolean replicationSync) throws Exception
{
-
- if (state != JournalState.STARTED)
+ System.out.println("LOAD! " + state + " " + replicationSync);
+ if (state == JournalState.STOPPED || state == JournalState.LOADED)
{
throw new IllegalStateException("Journal " + this + " must be in " + JournalState.STARTED + " state, was " +
state);
}
+ if (state == JournalState.SYNCING && replicationSync)
+ {
+ throw new IllegalStateException("Journal must be state " + JournalState.STARTED);
+ }
checkControlFile();
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java 2011-08-09 10:27:51 UTC (rev 11158)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java 2011-08-09 10:29:32 UTC (rev 11159)
@@ -223,4 +223,10 @@
{
throw new UnsupportedOperationException();
}
+
+ @Override
+ public JournalLoadInformation loadSyncOnly() throws Exception
+ {
+ throw new UnsupportedOperationException();
+ }
}
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java 2011-08-09 10:27:51 UTC (rev 11158)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/cluster/failover/BackupJournalSyncTest.java 2011-08-09 10:29:32 UTC (rev 11159)
@@ -161,10 +161,6 @@
@Override
protected void tearDown() throws Exception
{
- if (handler != null)
- {
- handler.notifyAll();
- }
if (sessionFactory != null)
sessionFactory.close();
if (session != null)
Modified: branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java 2011-08-09 10:27:51 UTC (rev 11158)
+++ branches/HORNETQ-720_Replication/tests/integration-tests/src/test/java/org/hornetq/tests/integration/replication/ReplicationTest.java 2011-08-09 10:29:32 UTC (rev 11159)
@@ -846,5 +846,16 @@
}
+ /*
+ * (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#loadSyncOnly()
+ */
+ @Override
+ public JournalLoadInformation loadSyncOnly() throws Exception
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
}
}
13 years, 5 months
JBoss hornetq SVN: r11158 - in branches/HORNETQ-720_Replication: hornetq-journal/src/main/java/org/hornetq/core/journal/impl and 1 other directory.
by do-not-reply@jboss.org
Author: borges
Date: 2011-08-09 06:27:51 -0400 (Tue, 09 Aug 2011)
New Revision: 11158
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java
Log:
HORNETQ-720 More support for replication during sync.
Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-08-09 10:26:46 UTC (rev 11157)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-08-09 10:27:51 UTC (rev 11158)
@@ -456,6 +456,7 @@
JournalImpl journal = assertJournalImpl(journalIf);
Map<Long, JournalFile> mapToFill = filesReservedForSync.get(packet.getJournalContentType());
JournalFile current = journal.createFilesForRemoteSync(packet.getFileIds(), mapToFill);
+ current.getFile().open(1, false);
registerJournal(packet.getJournalContentType().typeByte,
new ReplicatingJournal(current, storage.hasCallbackSupport()));
}
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java 2011-08-09 10:26:46 UTC (rev 11157)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java 2011-08-09 10:27:51 UTC (rev 11158)
@@ -36,7 +36,7 @@
public ReplicatingJournal(JournalFile file, boolean hasCallbackSupport)
{
super(hasCallbackSupport);
- this.currentFile = file;
+ currentFile = file;
}
@Override
@@ -77,36 +77,39 @@
{
JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, record);
- if (callback != null)
- {
- callback.storeLineUp();
- }
+ writeRecord(addRecord, sync, callback);
- lockAppend.lock();
- try
- {
- appendRecord(addRecord, sync, callback);
- }
- finally
- {
- lockAppend.unlock();
- }
}
/**
* Write the record to the current file.
*/
- private void appendRecord(JournalInternalRecord encoder, boolean sync, IOCompletion callback) throws Exception
+ private void writeRecord(JournalInternalRecord encoder, boolean sync, IOCompletion callback) throws Exception
{
- encoder.setFileID(currentFile.getRecordID());
- if (callback != null)
+
+ lockAppend.lock();
+ try
{
- currentFile.getFile().write(encoder, sync, callback);
+ if (callback != null)
+ {
+ callback.storeLineUp();
+ }
+
+ encoder.setFileID(currentFile.getRecordID());
+
+ if (callback != null)
+ {
+ currentFile.getFile().write(encoder, sync, callback);
+ }
+ else
+ {
+ currentFile.getFile().write(encoder, sync);
+ }
}
- else
+ finally
{
- currentFile.getFile().write(encoder, sync);
+ lockAppend.unlock();
}
}
@@ -134,7 +137,8 @@
appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync, IOCompletion callback)
throws Exception
{
- throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
+ JournalInternalRecord updateRecord = new JournalAddRecord(false, id, recordType, record);
+ writeRecord(updateRecord, sync, callback);
}
@Override
13 years, 5 months
JBoss hornetq SVN: r11157 - branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl.
by do-not-reply@jboss.org
Author: borges
Date: 2011-08-09 06:26:46 -0400 (Tue, 09 Aug 2011)
New Revision: 11157
Added:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalBase.java
Modified:
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java
Log:
HORNETQ-720 Move common 'boiler-plate' journaling code into base-class
Added: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalBase.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalBase.java (rev 0)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalBase.java 2011-08-09 10:26:46 UTC (rev 11157)
@@ -0,0 +1,216 @@
+package org.hornetq.core.journal.impl;
+
+import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.IOCompletion;
+import org.hornetq.core.journal.impl.dataformat.ByteArrayEncoding;
+
+abstract class JournalBase
+{
+
+ private final boolean hasCallbackSupport;
+
+ public JournalBase(boolean hasCallbackSupport)
+ {
+ this.hasCallbackSupport = hasCallbackSupport;
+ }
+
+ abstract public void appendAddRecord(final long id, final byte recordType, final EncodingSupport record,
+ final boolean sync, final IOCompletion callback) throws Exception;
+
+ abstract public void appendAddRecordTransactional(final long txID, final long id, final byte recordType,
+ final EncodingSupport record) throws Exception;
+
+ abstract public void appendCommitRecord(final long txID, final boolean sync, final IOCompletion callback,
+ boolean lineUpContext) throws Exception;
+
+ abstract public void appendDeleteRecord(final long id, final boolean sync, final IOCompletion callback)
+ throws Exception;
+
+ abstract public void appendDeleteRecordTransactional(final long txID, final long id, final EncodingSupport record)
+ throws Exception;
+
+ abstract public void appendPrepareRecord(final long txID, final EncodingSupport transactionData, final boolean sync,
+ final IOCompletion callback) throws Exception;
+
+ abstract public void appendUpdateRecord(final long id, final byte recordType, final EncodingSupport record,
+ final boolean sync, final IOCompletion callback) throws Exception;
+
+ abstract public void appendUpdateRecordTransactional(final long txID, final long id, final byte recordType,
+ final EncodingSupport record) throws Exception;
+
+ abstract public void appendRollbackRecord(final long txID, final boolean sync, final IOCompletion callback)
+ throws Exception;
+
+
+ public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception
+ {
+ appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync);
+ }
+
+ public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync, IOCompletion completionCallback)
+ throws Exception
+ {
+ appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync, completionCallback);
+ }
+
+ public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception
+ {
+ SyncIOCompletion callback = getSyncCallback(sync);
+
+ appendAddRecord(id, recordType, record, sync, callback);
+
+ if (callback != null)
+ {
+ callback.waitCompletion();
+ }
+ }
+
+ public void appendCommitRecord(final long txID, final boolean sync) throws Exception
+ {
+ SyncIOCompletion syncCompletion = getSyncCallback(sync);
+
+ appendCommitRecord(txID, sync, syncCompletion, true);
+
+ if (syncCompletion != null)
+ {
+ syncCompletion.waitCompletion();
+ }
+ }
+
+ public void appendCommitRecord(final long txID, final boolean sync, final IOCompletion callback) throws Exception
+ {
+ appendCommitRecord(txID, sync, callback, true);
+ }
+ public void appendUpdateRecord(final long id, final byte recordType, final byte[] record, final boolean sync)
+ throws Exception
+ {
+ appendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync);
+ }
+
+ public void appendUpdateRecordTransactional(final long txID, final long id, final byte recordType,
+ final byte[] record) throws Exception
+ {
+ appendUpdateRecordTransactional(txID, id, recordType, new ByteArrayEncoding(record));
+ }
+
+ public void appendUpdateRecord(final long id, final byte recordType, final byte[] record, final boolean sync,
+ final IOCompletion callback) throws Exception
+ {
+ appendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync, callback);
+ }
+
+ public void appendAddRecordTransactional(final long txID, final long id, final byte recordType, final byte[] record)
+ throws Exception
+ {
+ appendAddRecordTransactional(txID, id, recordType, new ByteArrayEncoding(record));
+ }
+
+ public void appendDeleteRecordTransactional(final long txID, final long id) throws Exception
+ {
+ appendDeleteRecordTransactional(txID, id, NullEncoding.instance);
+ }
+
+ public void appendPrepareRecord(final long txID, final byte[] transactionData, final boolean sync,
+ final IOCompletion completion) throws Exception
+ {
+ appendPrepareRecord(txID, new ByteArrayEncoding(transactionData), sync, completion);
+ }
+
+ public void appendPrepareRecord(final long txID, final byte[] transactionData, final boolean sync) throws Exception
+ {
+ appendPrepareRecord(txID, new ByteArrayEncoding(transactionData), sync);
+ }
+
+ public void appendPrepareRecord(final long txID, final EncodingSupport transactionData, final boolean sync)
+ throws Exception
+ {
+ SyncIOCompletion syncCompletion = getSyncCallback(sync);
+
+ appendPrepareRecord(txID, transactionData, sync, syncCompletion);
+
+ if (syncCompletion != null)
+ {
+ syncCompletion.waitCompletion();
+ }
+ }
+
+ public void appendDeleteRecordTransactional(final long txID, final long id, final byte[] record) throws Exception
+ {
+ appendDeleteRecordTransactional(txID, id, new ByteArrayEncoding(record));
+ }
+
+ public void
+ appendUpdateRecord(final long id, final byte recordType, final EncodingSupport record, final boolean sync)
+ throws Exception
+ {
+ SyncIOCompletion callback = getSyncCallback(sync);
+
+ appendUpdateRecord(id, recordType, record, sync, callback);
+
+ if (callback != null)
+ {
+ callback.waitCompletion();
+ }
+ }
+
+ public void appendRollbackRecord(final long txID, final boolean sync) throws Exception
+ {
+ SyncIOCompletion syncCompletion = getSyncCallback(sync);
+
+ appendRollbackRecord(txID, sync, syncCompletion);
+
+ if (syncCompletion != null)
+ {
+ syncCompletion.waitCompletion();
+ }
+
+ }
+
+ public void appendDeleteRecord(final long id, final boolean sync) throws Exception
+ {
+ SyncIOCompletion callback = getSyncCallback(sync);
+
+ appendDeleteRecord(id, sync, callback);
+
+ if (callback != null)
+ {
+ callback.waitCompletion();
+ }
+ }
+
+ protected SyncIOCompletion getSyncCallback(final boolean sync)
+ {
+ if (hasCallbackSupport)
+ {
+ if (sync)
+ {
+ return new SimpleWaitIOCallback();
+ }
+ return DummyCallback.getInstance();
+ }
+ return null;
+ }
+
+ private static class NullEncoding implements EncodingSupport
+ {
+
+ private static NullEncoding instance = new NullEncoding();
+
+ public void decode(final HornetQBuffer buffer)
+ {
+ // no-op
+ }
+
+ public void encode(final HornetQBuffer buffer)
+ {
+ // no-op
+ }
+
+ public int getEncodeSize()
+ {
+ return 0;
+ }
+ }
+
+}
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-08-09 09:43:05 UTC (rev 11156)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/JournalImpl.java 2011-08-09 10:26:46 UTC (rev 11157)
@@ -72,7 +72,7 @@
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert Suconic</a>
*/
-public class JournalImpl implements TestableJournal, JournalRecordProvider
+public class JournalImpl extends JournalBase implements TestableJournal, JournalRecordProvider
{
private enum JournalState
@@ -242,10 +242,7 @@
final int maxAIO,
final int userVersion)
{
- if (fileFactory == null)
- {
- throw new NullPointerException("fileFactory is null");
- }
+ super(fileFactory.isSupportsCallbacks());
if (fileSize < JournalImpl.MIN_FILE_SIZE)
{
throw new IllegalArgumentException("File size cannot be less than " + JournalImpl.MIN_FILE_SIZE + " bytes");
@@ -793,34 +790,9 @@
// Journal implementation
// ----------------------------------------------------------------
- public void appendAddRecord(final long id, final byte recordType, final byte[] record, final boolean sync) throws Exception
- {
- appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync);
- }
-
+ @Override
public void appendAddRecord(final long id,
final byte recordType,
- final byte[] record,
- final boolean sync,
- final IOCompletion callback) throws Exception
- {
- appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync, callback);
- }
-
- public void appendAddRecord(final long id, final byte recordType, final EncodingSupport record, final boolean sync) throws Exception
- {
- SyncIOCompletion callback = getSyncCallback(sync);
-
- appendAddRecord(id, recordType, record, sync, callback);
-
- if (callback != null)
- {
- callback.waitCompletion();
- }
- }
-
- public void appendAddRecord(final long id,
- final byte recordType,
final EncodingSupport record,
final boolean sync,
final IOCompletion callback) throws Exception
@@ -865,34 +837,9 @@
}
}
- public void appendUpdateRecord(final long id, final byte recordType, final byte[] record, final boolean sync) throws Exception
- {
- appendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync);
- }
-
+ @Override
public void appendUpdateRecord(final long id,
final byte recordType,
- final byte[] record,
- final boolean sync,
- final IOCompletion callback) throws Exception
- {
- appendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync, callback);
- }
-
- public void appendUpdateRecord(final long id, final byte recordType, final EncodingSupport record, final boolean sync) throws Exception
- {
- SyncIOCompletion callback = getSyncCallback(sync);
-
- appendUpdateRecord(id, recordType, record, sync, callback);
-
- if (callback != null)
- {
- callback.waitCompletion();
- }
- }
-
- public void appendUpdateRecord(final long id,
- final byte recordType,
final EncodingSupport record,
final boolean sync,
final IOCompletion callback) throws Exception
@@ -956,18 +903,8 @@
}
}
- public void appendDeleteRecord(final long id, final boolean sync) throws Exception
- {
- SyncIOCompletion callback = getSyncCallback(sync);
- appendDeleteRecord(id, sync, callback);
-
- if (callback != null)
- {
- callback.waitCompletion();
- }
- }
-
+ @Override
public void appendDeleteRecord(final long id, final boolean sync, final IOCompletion callback) throws Exception
{
checkJournalIsLoaded();
@@ -1035,12 +972,7 @@
}
}
- public void appendAddRecordTransactional(final long txID, final long id, final byte recordType, final byte[] record) throws Exception
- {
- appendAddRecordTransactional(txID, id, recordType, new ByteArrayEncoding(record));
-
- }
-
+ @Override
public void appendAddRecordTransactional(final long txID,
final long id,
final byte recordType,
@@ -1099,17 +1031,10 @@
state = newState;
}
+ @Override
public void appendUpdateRecordTransactional(final long txID,
final long id,
final byte recordType,
- final byte[] record) throws Exception
- {
- appendUpdateRecordTransactional(txID, id, recordType, new ByteArrayEncoding(record));
- }
-
- public void appendUpdateRecordTransactional(final long txID,
- final long id,
- final byte recordType,
final EncodingSupport record) throws Exception
{
checkJournalIsLoaded();
@@ -1151,11 +1076,8 @@
}
}
- public void appendDeleteRecordTransactional(final long txID, final long id, final byte[] record) throws Exception
- {
- appendDeleteRecordTransactional(txID, id, new ByteArrayEncoding(record));
- }
+ @Override
public void appendDeleteRecordTransactional(final long txID, final long id, final EncodingSupport record) throws Exception
{
checkJournalIsLoaded();
@@ -1195,39 +1117,6 @@
}
}
- public void appendDeleteRecordTransactional(final long txID, final long id) throws Exception
- {
- appendDeleteRecordTransactional(txID, id, NullEncoding.instance);
- }
-
- public void appendPrepareRecord(final long txID,
- final byte[] transactionData,
- final boolean sync,
- final IOCompletion completion) throws Exception
- {
- appendPrepareRecord(txID, new ByteArrayEncoding(transactionData), sync, completion);
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.Journal#appendPrepareRecord(long, byte[], boolean)
- */
- public void appendPrepareRecord(final long txID, final byte[] transactionData, final boolean sync) throws Exception
- {
- appendPrepareRecord(txID, new ByteArrayEncoding(transactionData), sync);
- }
-
- public void appendPrepareRecord(final long txID, final EncodingSupport transactionData, final boolean sync) throws Exception
- {
- SyncIOCompletion syncCompletion = getSyncCallback(sync);
-
- appendPrepareRecord(txID, transactionData, sync, syncCompletion);
-
- if (syncCompletion != null)
- {
- syncCompletion.waitCompletion();
- }
- }
-
/**
*
* <p>If the system crashed after a prepare was called, it should store information that is required to bring the transaction
@@ -1241,6 +1130,7 @@
* @param transactionData extra user data for the prepare
* @throws Exception
*/
+ @Override
public void appendPrepareRecord(final long txID,
final EncodingSupport transactionData,
final boolean sync,
@@ -1287,18 +1177,6 @@
}
}
- public void appendCommitRecord(final long txID, final boolean sync) throws Exception
- {
- SyncIOCompletion syncCompletion = getSyncCallback(sync);
-
- appendCommitRecord(txID, sync, syncCompletion, true);
-
- if (syncCompletion != null)
- {
- syncCompletion.waitCompletion();
- }
- }
-
/* (non-Javadoc)
* @see org.hornetq.core.journal.Journal#lineUpContex(org.hornetq.core.journal.IOCompletion)
*/
@@ -1307,12 +1185,7 @@
callback.storeLineUp();
}
- public void appendCommitRecord(final long txID, final boolean sync, final IOCompletion callback) throws Exception
- {
- appendCommitRecord(txID, sync, callback, true);
- }
-
/**
* <p>A transaction record (Commit or Prepare), will hold the number of elements the transaction has on each file.</p>
* <p>For example, a transaction was spread along 3 journal files with 10 pendingTransactions on each file.
@@ -1327,9 +1200,8 @@
* <p> If for any reason there are missing pendingTransactions, that means the transaction was not completed and we should ignore the whole transaction </p>
* <p> We can't just use a global counter as reclaiming could delete files after the transaction was successfully committed.
* That also means not having a whole file on journal-reload doesn't mean we have to invalidate the transaction </p>
- *
*/
-
+ @Override
public void appendCommitRecord(final long txID, final boolean sync, final IOCompletion callback, boolean lineUpContext) throws Exception
{
checkJournalIsLoaded();
@@ -1376,19 +1248,7 @@
}
}
- public void appendRollbackRecord(final long txID, final boolean sync) throws Exception
- {
- SyncIOCompletion syncCompletion = getSyncCallback(sync);
-
- appendRollbackRecord(txID, sync, syncCompletion);
-
- if (syncCompletion != null)
- {
- syncCompletion.waitCompletion();
- }
-
- }
-
+ @Override
public void appendRollbackRecord(final long txID, final boolean sync, final IOCompletion callback) throws Exception
{
checkJournalIsLoaded();
@@ -1475,6 +1335,7 @@
{
return load(committedRecords, preparedTransactions, failureCallback, true);
}
+
/**
* @see JournalImpl#load(LoaderCallback)
*/
@@ -3058,25 +2919,6 @@
return tx;
}
- private SyncIOCompletion getSyncCallback(final boolean sync)
- {
- if (fileFactory.isSupportsCallbacks())
- {
- if (sync)
- {
- return new SimpleWaitIOCallback();
- }
- else
- {
- return DummyCallback.getInstance();
- }
- }
- else
- {
- return null;
- }
- }
-
/**
* @return
* @throws Exception
@@ -3171,26 +3013,6 @@
// Inner classes
// ---------------------------------------------------------------------------
- private static class NullEncoding implements EncodingSupport
- {
-
- private static NullEncoding instance = new NullEncoding();
-
- public void decode(final HornetQBuffer buffer)
- {
- }
-
- public void encode(final HornetQBuffer buffer)
- {
- }
-
- public int getEncodeSize()
- {
- return 0;
- }
-
- }
-
// Used on Load
private static class TransactionHolder
{
Modified: branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java 2011-08-09 09:43:05 UTC (rev 11156)
+++ branches/HORNETQ-720_Replication/hornetq-journal/src/main/java/org/hornetq/core/journal/impl/ReplicatingJournal.java 2011-08-09 10:26:46 UTC (rev 11157)
@@ -14,7 +14,6 @@
import org.hornetq.core.journal.PreparedTransactionInfo;
import org.hornetq.core.journal.RecordInfo;
import org.hornetq.core.journal.TransactionFailureCallback;
-import org.hornetq.core.journal.impl.dataformat.ByteArrayEncoding;
import org.hornetq.core.journal.impl.dataformat.JournalAddRecord;
import org.hornetq.core.journal.impl.dataformat.JournalInternalRecord;
@@ -22,23 +21,22 @@
* Journal used at a replicating backup server during the synchronization of data with the 'live'
* server.
* <p>
- * Its main purpose is to store the data like a Journal would but without verifying records.
+ * Its main purpose is to store the data as a Journal would, but without verifying records.
*/
-public class ReplicatingJournal implements Journal
+public class ReplicatingJournal extends JournalBase implements Journal
{
private final ReentrantLock lockAppend = new ReentrantLock();
private final ReadWriteLock journalLock = new ReentrantReadWriteLock();
- private final JournalFile file;
- private final boolean hasCallbackSupport;
+ private final JournalFile currentFile;
/**
* @param file
*/
public ReplicatingJournal(JournalFile file, boolean hasCallbackSupport)
{
- this.file = file;
- this.hasCallbackSupport = hasCallbackSupport;
+ super(hasCallbackSupport);
+ this.currentFile = file;
}
@Override
@@ -60,32 +58,7 @@
}
// ------------------------
- @Override
- public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception
- {
- appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync);
- }
- @Override
- public void appendAddRecord(long id, byte recordType, byte[] record, boolean sync, IOCompletion completionCallback)
- throws Exception
- {
- appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync, completionCallback);
- }
-
- @Override
- public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception
- {
- SyncIOCompletion callback = getSyncCallback(sync);
-
- appendAddRecord(id, recordType, record, sync, callback);
-
- if (callback != null)
- {
- callback.waitCompletion();
- }
- }
-
// ------------------------
private void readLockJournal()
@@ -99,84 +72,52 @@
}
@Override
- public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync,
- IOCompletion callback) throws Exception
+ public void appendAddRecord(long id, byte recordType, EncodingSupport record, boolean sync, IOCompletion callback)
+ throws Exception
{
JournalInternalRecord addRecord = new JournalAddRecord(true, id, recordType, record);
- if (callback != null)
- {
- callback.storeLineUp();
- }
+ if (callback != null)
+ {
+ callback.storeLineUp();
+ }
- lockAppend.lock();
- try
- {
- JournalFile usedFile = appendRecord(addRecord, false, sync, null, callback);
- }
- finally
- {
- lockAppend.unlock();
- }
-
+ lockAppend.lock();
+ try
+ {
+ appendRecord(addRecord, sync, callback);
+ }
+ finally
+ {
+ lockAppend.unlock();
+ }
}
/**
- * @param addRecord
- * @param b
- * @param sync
- * @param object
- * @param callback
- * @return
+ * Write the record to the current file.
*/
- private JournalFile appendRecord(JournalInternalRecord addRecord, boolean b, boolean sync, Object object,
- IOCompletion callback)
+ private void appendRecord(JournalInternalRecord encoder, boolean sync, IOCompletion callback) throws Exception
{
- // TODO Auto-generated method stub
- return null;
- }
+ encoder.setFileID(currentFile.getRecordID());
- @Override
- public void appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync) throws Exception
- {
- throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
+ if (callback != null)
+ {
+ currentFile.getFile().write(encoder, sync, callback);
+ }
+ else
+ {
+ currentFile.getFile().write(encoder, sync);
+ }
}
@Override
- public void
- appendUpdateRecord(long id, byte recordType, byte[] record, boolean sync, IOCompletion completionCallback)
- throws Exception
- {
- throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
- }
-
- @Override
- public void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync) throws Exception
- {
- throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
- }
-
- @Override
- public void appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync,
- IOCompletion completionCallback) throws Exception
- {
- throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
- }
-
- @Override
- public void appendDeleteRecord(long id, boolean sync) throws Exception
- {
- throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
- }
-
- @Override
public void appendDeleteRecord(long id, boolean sync, IOCompletion completionCallback) throws Exception
{
throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@Override
- public void appendAddRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception
+ public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record) throws Exception
{
throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@@ -189,7 +130,9 @@
}
@Override
- public void appendUpdateRecordTransactional(long txID, long id, byte recordType, byte[] record) throws Exception
+ public void
+ appendUpdateRecord(long id, byte recordType, EncodingSupport record, boolean sync, IOCompletion callback)
+ throws Exception
{
throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
}
@@ -202,36 +145,6 @@
}
@Override
- public void appendDeleteRecordTransactional(long txID, long id, byte[] record) throws Exception
- {
- throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
- }
-
- @Override
- public void appendDeleteRecordTransactional(long txID, long id, EncodingSupport record) throws Exception
- {
- throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
- }
-
- @Override
- public void appendDeleteRecordTransactional(long txID, long id) throws Exception
- {
- throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
- }
-
- @Override
- public void appendCommitRecord(long txID, boolean sync) throws Exception
- {
- throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
- }
-
- @Override
- public void appendCommitRecord(long txID, boolean sync, IOCompletion callback) throws Exception
- {
- throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
- }
-
- @Override
public void appendCommitRecord(long txID, boolean sync, IOCompletion callback, boolean lineUpContext)
throws Exception
{
@@ -239,12 +152,6 @@
}
@Override
- public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync) throws Exception
- {
- throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
- }
-
- @Override
public void appendPrepareRecord(long txID, EncodingSupport transactionData, boolean sync, IOCompletion callback)
throws Exception
{
@@ -252,25 +159,6 @@
}
@Override
- public void appendPrepareRecord(long txID, byte[] transactionData, boolean sync) throws Exception
- {
- throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
- }
-
- @Override
- public void appendPrepareRecord(long txID, byte[] transactionData, boolean sync, IOCompletion callback)
- throws Exception
- {
- throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
- }
-
- @Override
- public void appendRollbackRecord(long txID, boolean sync) throws Exception
- {
- throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
- }
-
- @Override
public void appendRollbackRecord(long txID, boolean sync, IOCompletion callback) throws Exception
{
throw new HornetQException(HornetQException.UNSUPPORTED_PACKET);
@@ -331,17 +219,4 @@
{
throw new UnsupportedOperationException();
}
-
- private SyncIOCompletion getSyncCallback(final boolean sync)
- {
- if (hasCallbackSupport)
- {
- if (sync)
- {
- return new SimpleWaitIOCallback();
- }
- return DummyCallback.getInstance();
- }
- return null;
- }
}
13 years, 5 months