Author: borges
Date: 2011-09-19 11:32:57 -0400 (Mon, 19 Sep 2011)
New Revision: 11366
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerSessionImpl.java
Log:
HORNETQ-720 addToPage method to StorageManager interface
So that we use the storageManager lock to control the addition of data to pages.
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-09-19
15:31:38 UTC (rev 11365)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java 2011-09-19
15:32:57 UTC (rev 11366)
@@ -386,28 +386,20 @@
public synchronized void stop() throws Exception
{
- lock(-1);
- try
+ if (running)
{
- if (running)
- {
- running = false;
+ running = false;
- cursorProvider.stop();
+ cursorProvider.stop();
- flushExecutors();
+ flushExecutors();
- if (currentPage != null)
- {
- currentPage.close();
- currentPage = null;
- }
+ if (currentPage != null)
+ {
+ currentPage.close();
+ currentPage = null;
}
}
- finally
- {
- unlock();
- }
}
/** Wait any pending runnable to finish its execution. */
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java 2011-09-19
15:31:38 UTC (rev 11365)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java 2011-09-19
15:32:57 UTC (rev 11366)
@@ -39,6 +39,8 @@
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.RouteContextList;
+import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.group.impl.GroupBinding;
import org.hornetq.core.transaction.ResourceManager;
@@ -244,4 +246,13 @@
*/
void startReplication(ReplicationManager replicationManager, PagingManager
pagingManager) throws Exception;
+ /**
+ * Adds message to page if we are paging.
+ * @return whether we added the message to a page or not.
+ */
+ boolean addToPage(PagingManager pagingManager,
+ SimpleString address,
+ ServerMessage message,
+ RoutingContext ctx,
+ RouteContextList listCtx) throws Exception;
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-19
15:31:38 UTC (rev 11365)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-09-19
15:32:57 UTC (rev 11366)
@@ -86,6 +86,8 @@
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.RouteContextList;
+import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.group.impl.GroupBinding;
import org.hornetq.core.server.impl.ServerMessageImpl;
@@ -162,7 +164,7 @@
public enum JournalContent
{
- BINDINGS((byte)0), MESSAGES((byte)1);
+ BINDINGS((byte)0), MESSAGES((byte)1);
public final byte typeByte;
@@ -220,11 +222,6 @@
private final boolean hasCallbackSupport;
- public JournalStorageManager(final Configuration config, final ExecutorFactory
executorFactory)
- {
- this(config, executorFactory, null);
- }
-
public JournalStorageManager(final Configuration config,
final ExecutorFactory executorFactory,
final ReplicationManager replicator)
@@ -3295,7 +3292,7 @@
public static final class CursorAckRecordEncoding implements EncodingSupport
{
- public CursorAckRecordEncoding(final long queueID, final PagePosition position)
+ private CursorAckRecordEncoding(final long queueID, final PagePosition position)
{
this.queueID = queueID;
this.position = position;
@@ -3629,7 +3626,7 @@
* @param buffer
* @return
*/
- protected static PersistedRoles newSecurityRecord(long id, HornetQBuffer buffer)
+ private static PersistedRoles newSecurityRecord(long id, HornetQBuffer buffer)
{
PersistedRoles roles = new PersistedRoles();
roles.decode(buffer);
@@ -3642,7 +3639,7 @@
* @param buffer
* @return
*/
- protected static PersistedAddressSetting newAddressEncoding(long id, HornetQBuffer
buffer)
+ private static PersistedAddressSetting newAddressEncoding(long id, HornetQBuffer
buffer)
{
PersistedAddressSetting setting = new PersistedAddressSetting();
setting.decode(buffer);
@@ -3688,7 +3685,7 @@
* @param journal
* @throws Exception
*/
- protected static void describeJournal(SequentialFileFactory fileFactory, JournalImpl
journal) throws Exception
+ private static void describeJournal(SequentialFileFactory fileFactory, JournalImpl
journal) throws Exception
{
List<JournalFile> files = journal.orderFiles();
@@ -3896,4 +3893,22 @@
return hasCallbackSupport;
}
+ @Override
+ public boolean addToPage(PagingManager pagingManager,
+ SimpleString address,
+ ServerMessage message,
+ RoutingContext ctx,
+ RouteContextList listCtx) throws Exception
+ {
+ readLock();
+ try
+ {
+ PagingStore store = pagingManager.getPageStore(address);
+ return store.page(message, ctx, listCtx);
+ }
+ finally
+ {
+ readUnLock();
+ }
+ }
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2011-09-19
15:31:38 UTC (rev 11365)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2011-09-19
15:32:57 UTC (rev 11366)
@@ -43,6 +43,8 @@
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.RouteContextList;
+import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.group.impl.GroupBinding;
import org.hornetq.core.transaction.ResourceManager;
@@ -594,4 +596,14 @@
// no-op
}
+ @Override
+ public boolean addToPage(PagingManager manager,
+ SimpleString address,
+ ServerMessage message,
+ RoutingContext ctx,
+ RouteContextList listCtx) throws Exception
+ {
+ return false;
+ }
+
}
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-09-19
15:31:38 UTC (rev 11365)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2011-09-19
15:32:57 UTC (rev 11366)
@@ -583,7 +583,7 @@
return;
}
-
+
if (message.hasInternalProperties())
{
// We need to perform some cleanup on internal properties,
@@ -679,7 +679,7 @@
// arrived the target node
// as described on
https://issues.jboss.org/browse/JBPAPP-6130
ServerMessage copyRedistribute = message.copy(storageManager.generateUniqueID());
-
+
Bindings bindings =
addressManager.getBindingsForRoutingAddress(message.getAddress());
boolean res = false;
@@ -810,7 +810,7 @@
/**
* @param message
*/
- protected void cleanupInternalPropertiesBeforeRouting(final ServerMessage message)
+ private void cleanupInternalPropertiesBeforeRouting(final ServerMessage message)
{
for (SimpleString name : message.getPropertyNames())
{
@@ -845,13 +845,14 @@
private class PageDelivery extends TransactionOperationAbstract
{
- private Set<Queue> queues = new HashSet<Queue>();
+ private final Set<Queue> queues = new HashSet<Queue>();
public void addQueues(List<Queue> queueList)
{
queues.addAll(queueList);
}
+ @Override
public void afterCommit(Transaction tx)
{
// We need to try delivering async after paging, or nothing may start a delivery
after paging since nothing is
@@ -866,6 +867,7 @@
/* (non-Javadoc)
* @see
org.hornetq.core.transaction.TransactionOperation#getRelatedMessageReferences()
*/
+ @Override
public List<MessageReference> getRelatedMessageReferences()
{
return Collections.emptyList();
@@ -881,11 +883,8 @@
for (Map.Entry<SimpleString, RouteContextList> entry :
context.getContexListing().entrySet())
{
- PagingStore store = pagingManager.getPageStore(entry.getKey());
-
- if (store.page(message, context, entry.getValue()))
+ if (storageManager.addToPage(pagingManager, entry.getKey(), message, context,
entry.getValue()))
{
-
// We need to kick delivery so the Queues may check for the cursors case they
are empty
schedulePageDelivery(tx, entry);
continue;
@@ -1064,7 +1063,7 @@
warnMessage.append("Duplicate message detected through the bridge -
message will not be routed. Message information:\n");
warnMessage.append(message.toString());
PostOfficeImpl.log.warn(warnMessage.toString());
-
+
if (context.getTransaction() != null)
{
context.getTransaction().markAsRollbackOnly(new
HornetQException(HornetQException.DUPLICATE_ID_REJECTED, warnMessage.toString()));
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-19
15:31:38 UTC (rev 11365)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-19
15:32:57 UTC (rev 11366)
@@ -1304,10 +1304,7 @@
addressSettingsRepository);
}
- /**
- * This method is protected as it may be used as a hook for creating a custom storage
manager (on tests for instance)
- */
- protected StorageManager createStorageManager()
+ private StorageManager createStorageManager()
{
if (configuration.isPersistenceEnabled())
{
Modified:
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
---
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-09-19
15:31:38 UTC (rev 11365)
+++
branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-09-19
15:32:57 UTC (rev 11366)
@@ -107,7 +107,7 @@
private final boolean strictUpdateDeliveryCount;
- private RemotingConnection remotingConnection;
+ private final RemotingConnection remotingConnection;
private final Map<Long, ServerConsumer> consumers = new
ConcurrentHashMap<Long, ServerConsumer>();
@@ -151,9 +151,9 @@
private OperationContext sessionContext;
// Session's usage should be by definition single threaded, hence it's not
needed to use a concurrentHashMap here
- private Map<SimpleString, Pair<UUID, AtomicLong>> targetAddressInfos = new
HashMap<SimpleString, Pair<UUID, AtomicLong>>();
+ private final Map<SimpleString, Pair<UUID, AtomicLong>> targetAddressInfos
= new HashMap<SimpleString, Pair<UUID, AtomicLong>>();
- private long creationTime = System.currentTimeMillis();
+ private final long creationTime = System.currentTimeMillis();
// Constructors
---------------------------------------------------------------------------------
@@ -448,6 +448,7 @@
run();
}
+ @Override
public String toString()
{
return "Temporary Cleaner for queue " + bindingName;