Author: borges
Date: 2011-08-09 06:31:20 -0400 (Tue, 09 Aug 2011)
New Revision: 11161
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
Log:
HORNETQ-720 Avoid having two instances of JournalStorageManager in a replicated backup
server.
Also added missing methods to StorageManager to avoid using JournalStorageManager.
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java 2011-08-09
10:30:29 UTC (rev 11160)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java 2011-08-09
10:31:20 UTC (rev 11161)
@@ -22,6 +22,7 @@
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.journal.IOAsyncTask;
+import org.hornetq.core.journal.Journal;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.message.impl.MessageInternal;
import org.hornetq.core.paging.PageTransactionInfo;
@@ -41,9 +42,9 @@
import org.hornetq.core.transaction.ResourceManager;
/**
- *
+ *
* A StorageManager
- *
+ *
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @author <a href="mailto:clebert.suconic@jboss.com">Clebert
Suconic</a>
* @author <a href="mailto:andy.taylor@jboss.org>Andy Taylor</a>
@@ -54,12 +55,12 @@
/** Get the context associated with the thread for later reuse */
OperationContext getContext();
-
+
void lineUpContext();
/** It just creates an OperationContext without associating it */
OperationContext newContext(Executor executor);
-
+
OperationContext newSingleThreadContext();
/** Set the context back to the thread */
@@ -77,20 +78,20 @@
void afterCompleteOperations(IOAsyncTask run);
- /** Block until the operations are done.
+ /** Block until the operations are done.
* Warning: Don't use it inside an ordered executor, otherwise the system may
lock up
* in case of the pools are full
* @throws Exception */
boolean waitOnOperations(long timeout) throws Exception;
- /** Block until the operations are done.
+ /** Block until the operations are done.
* Warning: Don't use it inside an ordered executor, otherwise the system may
lock up
* in case of the pools are full
* @throws Exception */
void waitOnOperations() throws Exception;
void clearContext();
-
+
long generateUniqueID();
long getCurrentUniqueID();
@@ -102,7 +103,7 @@
void deleteMessage(long messageID) throws Exception;
void storeAcknowledge(long queueID, long messageID) throws Exception;
-
+
void storeCursorAcknowledge(long queueID, PagePosition position) throws Exception;
void updateDeliveryCount(MessageReference ref) throws Exception;
@@ -120,7 +121,7 @@
void storeAcknowledgeTransactional(long txID, long queueID, long messageID) throws
Exception;
void storeCursorAcknowledgeTransactional(long txID, long queueID, PagePosition
position) throws Exception;
-
+
void deleteCursorAcknowledgeTransactional(long txID, long ackID) throws Exception;
void updateScheduledDeliveryTimeTransactional(long txID, MessageReference ref) throws
Exception;
@@ -136,9 +137,9 @@
LargeServerMessage createLargeMessage();
/**
- *
+ *
* @param id
- * @param message This is a temporary message that holds the parsed properties.
+ * @param message This is a temporary message that holds the parsed properties.
* The remoting layer can't create a ServerMessage directly, then this will
be replaced.
* @return
*/
@@ -153,9 +154,9 @@
void rollback(long txID) throws Exception;
void storePageTransaction(long txID, PageTransactionInfo pageTransaction) throws
Exception;
-
+
void updatePageTransaction(long txID, PageTransactionInfo pageTransaction, int
depage) throws Exception;
-
+
void updatePageTransaction(PageTransactionInfo pageTransaction, int depage) throws
Exception;
void deletePageTransactional(long recordID) throws Exception;
@@ -187,39 +188,53 @@
void addGrouping(GroupBinding groupBinding) throws Exception;
void deleteGrouping(GroupBinding groupBinding) throws Exception;
-
+
void storeAddressSetting(PersistedAddressSetting addressSetting) throws Exception;
-
+
void deleteAddressSetting(SimpleString addressMatch) throws Exception;
-
+
List<PersistedAddressSetting> recoverAddressSettings() throws Exception;
-
+
void storeSecurityRoles(PersistedRoles persistedRoles) throws Exception;
-
+
void deleteSecurityRoles(SimpleString addressMatch) throws Exception;
List<PersistedRoles> recoverPersistedRoles() throws Exception;
-
- /**
+
+ /**
* @return The ID with the stored counter
*/
long storePageCounter(long txID, long queueID, long value) throws Exception;
-
+
void deleteIncrementRecord(long txID, long recordID) throws Exception;
-
+
void deletePageCounter(long txID, long recordID) throws Exception;
/**
* @return the ID with the increment record
- * @throws Exception
+ * @throws Exception
*/
long storePageCounterInc(long txID, long queueID, int add) throws Exception;
-
+
/**
* @return the ID with the increment record
- * @throws Exception
+ * @throws Exception
*/
long storePageCounterInc(long queueID, int add) throws Exception;
-
-
+
+ /**
+ * @return
+ */
+ boolean hasCallbackSupport();
+
+ /**
+ * @return the bindings journal
+ */
+ Journal getBindingsJournal();
+
+ /**
+ * @return the message journal
+ */
+ Journal getMessageJournal();
+
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2011-08-09
10:30:29 UTC (rev 11160)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2011-08-09
10:31:20 UTC (rev 11161)
@@ -24,6 +24,7 @@
import org.hornetq.api.core.Pair;
import org.hornetq.api.core.SimpleString;
import org.hornetq.core.journal.IOAsyncTask;
+import org.hornetq.core.journal.Journal;
import org.hornetq.core.journal.JournalLoadInformation;
import org.hornetq.core.message.impl.MessageInternal;
import org.hornetq.core.paging.PageTransactionInfo;
@@ -47,9 +48,9 @@
import org.hornetq.core.transaction.ResourceManager;
/**
- *
+ *
* A NullStorageManager
- *
+ *
* @author <a href="mailto:ataylor@redhat.com">Andy Taylor</a>
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
*
@@ -59,47 +60,47 @@
private final AtomicLong idSequence = new AtomicLong(0);
private volatile boolean started;
-
+
private static final OperationContext dummyContext = new OperationContext()
{
-
+
public void onError(int errorCode, String errorMessage)
{
}
-
+
public void done()
{
}
-
+
public void storeLineUp()
{
}
-
+
public boolean waitCompletion(long timeout) throws Exception
{
return true;
}
-
+
public void waitCompletion() throws Exception
{
}
-
+
public void replicationLineUp()
{
}
-
+
public void replicationDone()
{
}
-
+
public void pageSyncLineUp()
{
}
-
+
public void pageSyncDone()
{
}
-
+
public void executeOnCompletion(IOAsyncTask runnable)
{
runnable.done();
@@ -235,7 +236,7 @@
public LargeServerMessage createLargeMessage(final long id, final MessageInternal
message)
{
NullStorageLargeServerMessage largeMessage = new NullStorageLargeServerMessage();
-
+
largeMessage.copyHeadersAndProperties(message);
largeMessage.setMessageID(id);
@@ -407,8 +408,8 @@
{
return dummyContext;
}
-
-
+
+
public OperationContext newSingleThreadContext()
{
return dummyContext;
@@ -566,7 +567,25 @@
public void lineUpContext()
{
// TODO Auto-generated method stub
-
+
}
+ @Override
+ public boolean hasCallbackSupport()
+ {
+ return false;
+ }
+
+ @Override
+ public Journal getBindingsJournal()
+ {
+ return null;
+ }
+
+ @Override
+ public Journal getMessageJournal()
+ {
+ return null;
+ }
+
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-08-09
10:30:29 UTC (rev 11160)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2011-08-09
10:31:20 UTC (rev 11161)
@@ -35,7 +35,7 @@
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.impl.PagingManagerImpl;
import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
-import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager.JournalContent;
import org.hornetq.core.protocol.core.Channel;
import org.hornetq.core.protocol.core.Packet;
@@ -90,7 +90,7 @@
/** Used to hold the real Journals before the backup is synchronized. */
private final Map<JournalContent, Journal> journalsHolder = new
HashMap<JournalContent, Journal>();
- private JournalStorageManager storage;
+ private StorageManager storage;
private PagingManager pageManager;
@@ -234,7 +234,7 @@
{
Configuration config = server.getConfiguration();
- storage = new JournalStorageManager(config, server.getExecutorFactory());
+ storage = server.getStorageManager();
storage.start();
server.getManagementService().setStorageManager(storage);