[hornetq-commits] JBoss hornetq SVN: r8044 - in branches/Replication_Clebert: src/main/org/hornetq/core/persistence/impl/journal and 7 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Mon Oct 5 11:13:57 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-10-05 11:13:57 -0400 (Mon, 05 Oct 2009)
New Revision: 8044
Modified:
branches/Replication_Clebert/src/main/org/hornetq/core/persistence/StorageManager.java
branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
branches/Replication_Clebert/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
Changes
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/persistence/StorageManager.java 2009-10-05 12:54:00 UTC (rev 8043)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/persistence/StorageManager.java 2009-10-05 15:13:57 UTC (rev 8044)
@@ -44,6 +44,10 @@
{
// Message related operations
+ boolean isReplicated();
+
+ void afterReplicated(Runnable run);
+
UUID getPersistentID();
void setPersistentID(UUID id) throws Exception;
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-05 12:54:00 UTC (rev 8043)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-05 15:13:57 UTC (rev 8044)
@@ -54,6 +54,8 @@
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.remoting.impl.wireformat.XidCodecSupport;
import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.core.replication.ReplicationManager;
+import org.hornetq.core.replication.impl.ReplicatedJournalImpl;
import org.hornetq.core.server.JournalType;
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.MessageReference;
@@ -116,6 +118,8 @@
private UUID persistentID;
private final BatchingIDGenerator idGenerator;
+
+ private final ReplicationManager replicator;
private final Journal messageJournal;
@@ -145,7 +149,14 @@
public JournalStorageManager(final Configuration config, final Executor executor)
{
+ this (config, executor, null);
+ }
+
+ public JournalStorageManager(final Configuration config, final Executor executor, final ReplicationManager replicator)
+ {
this.executor = executor;
+
+ this.replicator = replicator;
if (config.getJournalType() != JournalType.NIO && config.getJournalType() != JournalType.ASYNCIO)
{
@@ -166,7 +177,7 @@
SequentialFileFactory bindingsFF = new NIOSequentialFileFactory(bindingsDir);
- bindingsJournal = new JournalImpl(1024 * 1024,
+ Journal localBindings = new JournalImpl(1024 * 1024,
2,
config.getJournalCompactMinFiles(),
config.getJournalCompactPercentage(),
@@ -174,6 +185,15 @@
"hornetq-bindings",
"bindings",
1);
+
+ if (replicator != null)
+ {
+ this.bindingsJournal = new ReplicatedJournalImpl((byte)0, localBindings, replicator);
+ }
+ else
+ {
+ this.bindingsJournal = localBindings;
+ }
if (journalDir == null)
{
@@ -218,7 +238,7 @@
this.idGenerator = new BatchingIDGenerator(0, CHECKPOINT_BATCH_SIZE, bindingsJournal);
- messageJournal = new JournalImpl(config.getJournalFileSize(),
+ Journal localMessage = new JournalImpl(config.getJournalFileSize(),
config.getJournalMinFiles(),
config.getJournalCompactMinFiles(),
config.getJournalCompactPercentage(),
@@ -227,13 +247,39 @@
"hq",
config.getJournalMaxAIO());
+
+ if (replicator != null)
+ {
+ this.messageJournal = new ReplicatedJournalImpl((byte)1, localMessage, replicator);
+ }
+ else
+ {
+ this.messageJournal = localBindings;
+ }
+
+
largeMessagesDirectory = config.getLargeMessagesDirectory();
largeMessagesFactory = new NIOSequentialFileFactory(largeMessagesDirectory);
perfBlastPages = config.getJournalPerfBlastPages();
}
+
+ public boolean isReplicated()
+ {
+ return replicator != null;
+ }
+
+ public void afterReplicated(Runnable run)
+ {
+ if (replicator == null)
+ {
+ throw new IllegalStateException("StorageManager is not replicated");
+ }
+ replicator.afterReplicated(run);
+ }
+
public UUID getPersistentID()
{
return persistentID;
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-10-05 12:54:00 UTC (rev 8043)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-10-05 15:13:57 UTC (rev 8044)
@@ -245,4 +245,20 @@
{
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#afterReplicated(java.lang.Runnable)
+ */
+ public void afterReplicated(Runnable run)
+ {
+ run.run();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#isReplicated()
+ */
+ public boolean isReplicated()
+ {
+ return false;
+ }
+
}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-10-05 12:54:00 UTC (rev 8043)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-10-05 15:13:57 UTC (rev 8044)
@@ -46,7 +46,7 @@
void appendRollbackRecord(byte journalID, long txID) throws Exception;
/** Add an action to be executed after the pending replications */
- void addReplicationAction(Runnable runnable);
+ void afterReplicated(Runnable runnable);
void completeToken();
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-10-05 12:54:00 UTC (rev 8043)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-10-05 15:13:57 UTC (rev 8044)
@@ -284,7 +284,7 @@
/* (non-Javadoc)
* @see org.hornetq.core.replication.ReplicationManager#addReplicationAction(java.lang.Runnable)
*/
- public void addReplicationAction(final Runnable runnable)
+ public void afterReplicated(final Runnable runnable)
{
getReplicationToken().addReplicationAction(runnable);
}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-10-05 12:54:00 UTC (rev 8043)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-10-05 15:13:57 UTC (rev 8044)
@@ -887,7 +887,7 @@
{
if (configuration.isPersistenceEnabled())
{
- return new JournalStorageManager(configuration, threadPool);
+ return new JournalStorageManager(configuration, threadPool, replicationManager);
}
else
{
@@ -981,6 +981,9 @@
deploymentManager = new FileDeploymentManager(configuration.getFileDeployerScanPeriod());
}
+
+ startReplication();
+
this.storageManager = createStorageManager();
securityRepository = new HierarchicalObjectRepository<Set<Role>>();
@@ -1123,8 +1126,6 @@
}
}, 0, dumpInfoInterval, TimeUnit.MILLISECONDS);
}
-
- startReplication();
initialised = true;
}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-10-05 12:54:00 UTC (rev 8043)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-10-05 15:13:57 UTC (rev 8044)
@@ -239,7 +239,7 @@
boolean durableRef = message.isDurable() && durable;
// If durable, must be persisted before anything is routed
- MessageReference ref = message.createReference(this);
+ final MessageReference ref = message.createReference(this);
PagingStore store = pagingManager.getPageStore(message.getDestination());
@@ -270,8 +270,20 @@
{
storageManager.updateScheduledDeliveryTime(ref);
}
-
- addLast(ref);
+
+ if (storageManager.isReplicated())
+ {
+ storageManager.afterReplicated(new Runnable(){
+ public void run()
+ {
+ addLast(ref);
+ }
+ });
+ }
+ else
+ {
+ addLast(ref);
+ }
}
else
{
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2009-10-05 12:54:00 UTC (rev 8043)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2009-10-05 15:13:57 UTC (rev 8044)
@@ -229,21 +229,51 @@
operation.beforeCommit(this);
}
}
+
+ // TODO: Verify Exception handling here with Tim
+ Runnable execAfterCommit = null;
+
+ if (operations != null)
+ {
+ execAfterCommit = new Runnable()
+ {
+ public void run()
+ {
+ for (TransactionOperation operation : operations)
+ {
+ try
+ {
+ operation.afterCommit(TransactionImpl.this);
+ }
+ catch (Exception e)
+ {
+ log.warn(e.getMessage(), e);
+ }
+ }
+ }
+ };
+ }
if ((getProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT) != null) || (xid != null && state == State.PREPARED))
{
storageManager.commit(id);
- }
-
- state = State.COMMITTED;
-
- if (operations != null)
- {
- for (TransactionOperation operation : operations)
+ state = State.COMMITTED;
+ if (execAfterCommit != null)
{
- operation.afterCommit(this);
+ if (storageManager.isReplicated())
+ {
+ storageManager.afterReplicated(execAfterCommit);
+ }
}
+ else if (execAfterCommit != null)
+ {
+ execAfterCommit.run();
+ }
}
+ else if (execAfterCommit != null)
+ {
+ execAfterCommit.run();
+ }
}
}
Modified: branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-10-05 12:54:00 UTC (rev 8043)
+++ branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-10-05 15:13:57 UTC (rev 8044)
@@ -169,7 +169,7 @@
replicatedJournal.appendRollbackRecord(3, false);
final CountDownLatch latch = new CountDownLatch(1);
- manager.addReplicationAction(new Runnable()
+ manager.afterReplicated(new Runnable()
{
public void run()
Modified: branches/Replication_Clebert/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Replication_Clebert/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-10-05 12:54:00 UTC (rev 8043)
+++ branches/Replication_Clebert/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-10-05 15:13:57 UTC (rev 8044)
@@ -16,7 +16,6 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
@@ -25,8 +24,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-import javax.transaction.xa.Xid;
-
import org.hornetq.core.buffers.ChannelBuffers;
import org.hornetq.core.journal.SequentialFile;
import org.hornetq.core.journal.SequentialFileFactory;
@@ -41,26 +38,19 @@
import org.hornetq.core.paging.impl.PagedMessageImpl;
import org.hornetq.core.paging.impl.PagingStoreImpl;
import org.hornetq.core.paging.impl.TestSupportPageStore;
-import org.hornetq.core.persistence.QueueBindingInfo;
import org.hornetq.core.persistence.StorageManager;
-import org.hornetq.core.postoffice.Binding;
+import org.hornetq.core.persistence.impl.nullpm.NullStorageManager;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.remoting.spi.HornetQBuffer;
-import org.hornetq.core.server.LargeServerMessage;
-import org.hornetq.core.server.MessageReference;
-import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
-import org.hornetq.core.transaction.ResourceManager;
import org.hornetq.tests.unit.core.journal.impl.fakes.FakeSequentialFileFactory;
import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.UnitTestCase;
-import org.hornetq.utils.Pair;
import org.hornetq.utils.SimpleString;
-import org.hornetq.utils.UUID;
/**
*
@@ -835,273 +825,9 @@
}
- class FakeStorageManager implements StorageManager
+ class FakeStorageManager extends NullStorageManager
{
- public void setUniqueIDSequence(long id)
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#addQueueBinding(org.hornetq.core.postoffice.Binding)
- */
- public void addQueueBinding(final Binding binding) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#commit(long)
- */
- public void commit(final long txID) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#createLargeMessage()
- */
- public LargeServerMessage createLargeMessage()
- {
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#deleteDuplicateID(long)
- */
- public void deleteDuplicateID(final long recordID) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#deleteDuplicateIDTransactional(long, long)
- */
- public void deleteDuplicateIDTransactional(final long txID, final long recordID) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#deleteMessage(long)
- */
- public void deleteMessage(final long messageID) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#deleteMessageTransactional(long, long, long)
- */
- public void deleteMessageTransactional(final long txID, final long queueID, final long messageID) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#deletePageTransactional(long, long)
- */
- public void deletePageTransactional(final long txID, final long recordID) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#deleteQueueBinding(long)
- */
- public void deleteQueueBinding(final long queueBindingID) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#generateUniqueID()
- */
- public long generateUniqueID()
- {
- return 0;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#getCurrentUniqueID()
- */
- public long getCurrentUniqueID()
- {
- return 0;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#getPersistentID()
- */
- public UUID getPersistentID()
- {
- return null;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#loadBindingJournal(java.util.List)
- */
- public void loadBindingJournal(final List<QueueBindingInfo> queueBindingInfos) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#loadMessageJournal(org.hornetq.core.paging.PagingManager, java.util.Map, org.hornetq.core.transaction.ResourceManager, java.util.Map)
- */
- public void loadMessageJournal(PagingManager pagingManager,
- ResourceManager resourceManager,
- Map<Long, Queue> queues,
- Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#prepare(long, javax.transaction.xa.Xid)
- */
- public void prepare(final long txID, final Xid xid) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#rollback(long)
- */
- public void rollback(final long txID) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#setPersistentID(org.hornetq.utils.UUID)
- */
- public void setPersistentID(final UUID id) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#storeAcknowledge(long, long)
- */
- public void storeAcknowledge(final long queueID, final long messageID) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#storeAcknowledgeTransactional(long, long, long)
- */
- public void storeAcknowledgeTransactional(final long txID, final long queueID, final long messageID) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#storeDuplicateID(org.hornetq.utils.SimpleString, byte[], long)
- */
- public void storeDuplicateID(final SimpleString address, final byte[] duplID, final long recordID) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#storeDuplicateIDTransactional(long, org.hornetq.utils.SimpleString, byte[], long)
- */
- public void storeDuplicateIDTransactional(final long txID,
- final SimpleString address,
- final byte[] duplID,
- final long recordID) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#storeMessage(org.hornetq.core.server.ServerMessage)
- */
- public void storeMessage(final ServerMessage message) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#storeMessageTransactional(long, org.hornetq.core.server.ServerMessage)
- */
- public void storeMessageTransactional(final long txID, final ServerMessage message) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#storePageTransaction(long, org.hornetq.core.paging.PageTransactionInfo)
- */
- public void storePageTransaction(final long txID, final PageTransactionInfo pageTransaction) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#storeReference(long, long)
- */
- public void storeReference(final long queueID, final long messageID) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#storeReferenceTransactional(long, long, long)
- */
- public void storeReferenceTransactional(final long txID, final long queueID, final long messageID) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#updateDeliveryCount(org.hornetq.core.server.MessageReference)
- */
- public void updateDeliveryCount(final MessageReference ref) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#updateDuplicateID(org.hornetq.utils.SimpleString, byte[], long)
- */
- public void updateDuplicateID(final SimpleString address, final byte[] duplID, final long recordID) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#updateDuplicateIDTransactional(long, org.hornetq.utils.SimpleString, byte[], long)
- */
- public void updateDuplicateIDTransactional(final long txID,
- final SimpleString address,
- final byte[] duplID,
- final long recordID) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#updateScheduledDeliveryTime(org.hornetq.core.server.MessageReference)
- */
- public void updateScheduledDeliveryTime(final MessageReference ref) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#updateScheduledDeliveryTimeTransactional(long, org.hornetq.core.server.MessageReference)
- */
- public void updateScheduledDeliveryTimeTransactional(final long txID, final MessageReference ref) throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQComponent#isStarted()
- */
- public boolean isStarted()
- {
- return false;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQComponent#start()
- */
- public void start() throws Exception
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQComponent#stop()
- */
- public void stop() throws Exception
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.persistence.StorageManager#loadInternalOnly()
- */
- public void loadInternalOnly() throws Exception
- {
- }
-
}
class FakeStoreFactory implements PagingStoreFactory
More information about the hornetq-commits
mailing list