[hornetq-commits] JBoss hornetq SVN: r11366 - in branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core: persistence and 4 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Mon Sep 19 11:32:57 EDT 2011


Author: borges
Date: 2011-09-19 11:32:57 -0400 (Mon, 19 Sep 2011)
New Revision: 11366

Modified:
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
   branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerSessionImpl.java
Log:
HORNETQ-720 addToPage method to StorageManager interface
So that we use the storageManager lock to control the addition of data to pages.

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java	2011-09-19 15:31:38 UTC (rev 11365)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/paging/impl/PagingStoreImpl.java	2011-09-19 15:32:57 UTC (rev 11366)
@@ -386,28 +386,20 @@
 
    public synchronized void stop() throws Exception
    {
-      lock(-1);
-      try
+      if (running)
       {
-         if (running)
-         {
-            running = false;
+         running = false;
 
-            cursorProvider.stop();
+         cursorProvider.stop();
 
-            flushExecutors();
+         flushExecutors();
 
-            if (currentPage != null)
-            {
-               currentPage.close();
-               currentPage = null;
-            }
+         if (currentPage != null)
+         {
+            currentPage.close();
+            currentPage = null;
          }
       }
-      finally
-      {
-         unlock();
-      }
    }
 
    /** Wait any pending runnable to finish its execution. */

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java	2011-09-19 15:31:38 UTC (rev 11365)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/StorageManager.java	2011-09-19 15:32:57 UTC (rev 11366)
@@ -39,6 +39,8 @@
 import org.hornetq.core.server.LargeServerMessage;
 import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.RouteContextList;
+import org.hornetq.core.server.RoutingContext;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.server.group.impl.GroupBinding;
 import org.hornetq.core.transaction.ResourceManager;
@@ -244,4 +246,13 @@
     */
    void startReplication(ReplicationManager replicationManager, PagingManager pagingManager) throws Exception;
 
+   /**
+    * Adds message to page if we are paging.
+    * @return whether we added the message to a page or not.
+    */
+   boolean addToPage(PagingManager pagingManager,
+      SimpleString address,
+      ServerMessage message,
+      RoutingContext ctx,
+      RouteContextList listCtx) throws Exception;
 }

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-09-19 15:31:38 UTC (rev 11365)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2011-09-19 15:32:57 UTC (rev 11366)
@@ -86,6 +86,8 @@
 import org.hornetq.core.server.LargeServerMessage;
 import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.RouteContextList;
+import org.hornetq.core.server.RoutingContext;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.server.group.impl.GroupBinding;
 import org.hornetq.core.server.impl.ServerMessageImpl;
@@ -162,7 +164,7 @@
 
    public enum JournalContent
    {
-      BINDINGS((byte)0),      MESSAGES((byte)1);
+      BINDINGS((byte)0), MESSAGES((byte)1);
 
       public final byte typeByte;
 
@@ -220,11 +222,6 @@
 
    private final boolean hasCallbackSupport;
 
-   public JournalStorageManager(final Configuration config, final ExecutorFactory executorFactory)
-   {
-      this(config, executorFactory, null);
-   }
-
    public JournalStorageManager(final Configuration config,
                                 final ExecutorFactory executorFactory,
                                 final ReplicationManager replicator)
@@ -3295,7 +3292,7 @@
 
    public static final class CursorAckRecordEncoding implements EncodingSupport
    {
-      public CursorAckRecordEncoding(final long queueID, final PagePosition position)
+      private CursorAckRecordEncoding(final long queueID, final PagePosition position)
       {
          this.queueID = queueID;
          this.position = position;
@@ -3629,7 +3626,7 @@
     * @param buffer
     * @return
     */
-   protected static PersistedRoles newSecurityRecord(long id, HornetQBuffer buffer)
+   private static PersistedRoles newSecurityRecord(long id, HornetQBuffer buffer)
    {
       PersistedRoles roles = new PersistedRoles();
       roles.decode(buffer);
@@ -3642,7 +3639,7 @@
     * @param buffer
     * @return
     */
-   protected static PersistedAddressSetting newAddressEncoding(long id, HornetQBuffer buffer)
+   private static PersistedAddressSetting newAddressEncoding(long id, HornetQBuffer buffer)
    {
       PersistedAddressSetting setting = new PersistedAddressSetting();
       setting.decode(buffer);
@@ -3688,7 +3685,7 @@
     * @param journal
     * @throws Exception
     */
-   protected static void describeJournal(SequentialFileFactory fileFactory, JournalImpl journal) throws Exception
+   private static void describeJournal(SequentialFileFactory fileFactory, JournalImpl journal) throws Exception
    {
       List<JournalFile> files = journal.orderFiles();
 
@@ -3896,4 +3893,22 @@
       return hasCallbackSupport;
    }
 
+   @Override
+   public boolean addToPage(PagingManager pagingManager,
+      SimpleString address,
+      ServerMessage message,
+      RoutingContext ctx,
+      RouteContextList listCtx) throws Exception
+   {
+      readLock();
+      try
+      {
+         PagingStore store = pagingManager.getPageStore(address);
+         return store.page(message, ctx, listCtx);
+      }
+      finally
+      {
+         readUnLock();
+      }
+   }
 }

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2011-09-19 15:31:38 UTC (rev 11365)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java	2011-09-19 15:32:57 UTC (rev 11366)
@@ -43,6 +43,8 @@
 import org.hornetq.core.server.LargeServerMessage;
 import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.RouteContextList;
+import org.hornetq.core.server.RoutingContext;
 import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.server.group.impl.GroupBinding;
 import org.hornetq.core.transaction.ResourceManager;
@@ -594,4 +596,14 @@
       // no-op
    }
 
+   @Override
+   public boolean addToPage(PagingManager manager,
+      SimpleString address,
+      ServerMessage message,
+      RoutingContext ctx,
+      RouteContextList listCtx) throws Exception
+   {
+      return false;
+   }
+
 }

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2011-09-19 15:31:38 UTC (rev 11365)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2011-09-19 15:32:57 UTC (rev 11366)
@@ -583,7 +583,7 @@
          return;
       }
 
-      
+
       if (message.hasInternalProperties())
       {
          // We need to perform some cleanup on internal properties,
@@ -679,7 +679,7 @@
       // arrived the target node
       // as described on https://issues.jboss.org/browse/JBPAPP-6130
       ServerMessage copyRedistribute = message.copy(storageManager.generateUniqueID());
-      
+
       Bindings bindings = addressManager.getBindingsForRoutingAddress(message.getAddress());
 
       boolean res = false;
@@ -810,7 +810,7 @@
    /**
     * @param message
     */
-   protected void cleanupInternalPropertiesBeforeRouting(final ServerMessage message)
+   private void cleanupInternalPropertiesBeforeRouting(final ServerMessage message)
    {
       for (SimpleString name : message.getPropertyNames())
       {
@@ -845,13 +845,14 @@
 
    private class PageDelivery extends TransactionOperationAbstract
    {
-      private Set<Queue> queues = new HashSet<Queue>();
+      private final Set<Queue> queues = new HashSet<Queue>();
 
       public void addQueues(List<Queue> queueList)
       {
          queues.addAll(queueList);
       }
 
+      @Override
       public void afterCommit(Transaction tx)
       {
          // We need to try delivering async after paging, or nothing may start a delivery after paging since nothing is
@@ -866,6 +867,7 @@
       /* (non-Javadoc)
        * @see org.hornetq.core.transaction.TransactionOperation#getRelatedMessageReferences()
        */
+      @Override
       public List<MessageReference> getRelatedMessageReferences()
       {
          return Collections.emptyList();
@@ -881,11 +883,8 @@
 
       for (Map.Entry<SimpleString, RouteContextList> entry : context.getContexListing().entrySet())
       {
-         PagingStore store = pagingManager.getPageStore(entry.getKey());
-
-         if (store.page(message, context, entry.getValue()))
+         if (storageManager.addToPage(pagingManager, entry.getKey(), message, context, entry.getValue()))
          {
-
             // We need to kick delivery so the Queues may check for the cursors case they are empty
             schedulePageDelivery(tx, entry);
             continue;
@@ -1064,7 +1063,7 @@
             warnMessage.append("Duplicate message detected through the bridge - message will not be routed. Message information:\n");
             warnMessage.append(message.toString());
             PostOfficeImpl.log.warn(warnMessage.toString());
-            
+
             if (context.getTransaction() != null)
             {
                context.getTransaction().markAsRollbackOnly(new HornetQException(HornetQException.DUPLICATE_ID_REJECTED, warnMessage.toString()));

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-09-19 15:31:38 UTC (rev 11365)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/HornetQServerImpl.java	2011-09-19 15:32:57 UTC (rev 11366)
@@ -1304,10 +1304,7 @@
                                    addressSettingsRepository);
    }
 
-   /**
-    * This method is protected as it may be used as a hook for creating a custom storage manager (on tests for instance)
-    */
-   protected StorageManager createStorageManager()
+   private StorageManager createStorageManager()
    {
       if (configuration.isPersistenceEnabled())
       {

Modified: branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerSessionImpl.java	2011-09-19 15:31:38 UTC (rev 11365)
+++ branches/HORNETQ-720_Replication/hornetq-core/src/main/java/org/hornetq/core/server/impl/ServerSessionImpl.java	2011-09-19 15:32:57 UTC (rev 11366)
@@ -107,7 +107,7 @@
 
    private final boolean strictUpdateDeliveryCount;
 
-   private RemotingConnection remotingConnection;
+   private final RemotingConnection remotingConnection;
 
    private final Map<Long, ServerConsumer> consumers = new ConcurrentHashMap<Long, ServerConsumer>();
 
@@ -151,9 +151,9 @@
    private OperationContext sessionContext;
 
    // Session's usage should be by definition single threaded, hence it's not needed to use a concurrentHashMap here
-   private Map<SimpleString, Pair<UUID, AtomicLong>> targetAddressInfos = new HashMap<SimpleString,  Pair<UUID, AtomicLong>>();
+   private final Map<SimpleString, Pair<UUID, AtomicLong>> targetAddressInfos = new HashMap<SimpleString,  Pair<UUID, AtomicLong>>();
    
-   private long creationTime = System.currentTimeMillis();
+   private final long creationTime = System.currentTimeMillis();
 
    // Constructors ---------------------------------------------------------------------------------
 
@@ -448,6 +448,7 @@
          run();
       }
       
+      @Override
       public String toString()
       {
          return "Temporary Cleaner for queue " + bindingName;



More information about the hornetq-commits mailing list