[hornetq-commits] JBoss hornetq SVN: r8074 - in branches/Replication_Clebert: src/main/org/hornetq/core/persistence and 3 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Oct 8 22:45:06 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-10-08 22:45:05 -0400 (Thu, 08 Oct 2009)
New Revision: 8074

Modified:
   branches/Replication_Clebert/src/main/org/hornetq/core/paging/impl/PageImpl.java
   branches/Replication_Clebert/src/main/org/hornetq/core/persistence/StorageManager.java
   branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
   branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
   branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
fixes

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/paging/impl/PageImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/paging/impl/PageImpl.java	2009-10-09 00:45:01 UTC (rev 8073)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/paging/impl/PageImpl.java	2009-10-09 02:45:05 UTC (rev 8074)
@@ -191,7 +191,10 @@
 
    public void close() throws Exception
    {
-      storageManager.pageClosed(storeName, pageId);
+      if (storageManager != null)
+      {
+         storageManager.pageClosed(storeName, pageId);
+      }
       file.close();
    }
 

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/persistence/StorageManager.java	2009-10-09 00:45:01 UTC (rev 8073)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/persistence/StorageManager.java	2009-10-09 02:45:05 UTC (rev 8074)
@@ -18,7 +18,6 @@
 
 import javax.transaction.xa.Xid;
 
-import org.hornetq.core.buffers.ChannelBuffer;
 import org.hornetq.core.paging.PageTransactionInfo;
 import org.hornetq.core.paging.PagedMessage;
 import org.hornetq.core.paging.PagingManager;

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java	2009-10-09 00:45:01 UTC (rev 8073)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java	2009-10-09 02:45:05 UTC (rev 8074)
@@ -87,6 +87,8 @@
 import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationPageEventMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationPageWriteMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationPrepareMessage;
 import org.hornetq.core.remoting.impl.wireformat.ReplicationResponseMessage;
 import org.hornetq.core.remoting.impl.wireformat.RollbackMessage;
@@ -410,12 +412,12 @@
          }
          case REPLICATION_PAGE_WRITE:
          {
-            packet = new ReplicationResponseMessage();
+            packet = new ReplicationPageWriteMessage();
             break;
          }
          case REPLICATION_PAGE_EVENT:
          {
-            packet = new ReplicationResponseMessage();
+            packet = new ReplicationPageEventMessage();
             break;
          }
          default:

Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2009-10-09 00:45:01 UTC (rev 8073)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java	2009-10-09 02:45:05 UTC (rev 8074)
@@ -299,6 +299,12 @@
       ConcurrentMap<Integer, Page> pages = getPageMap(packet.getStoreName());
 
       Page page = pages.remove(packet.getPageNumber());
+      
+      if (page == null)
+      {
+         page = getPage(packet.getStoreName(), packet.getPageNumber());
+      }
+      
 
       if (page != null)
       {
@@ -331,7 +337,12 @@
 
       if (resultIndex == null)
       {
-         resultIndex = pageIndex.putIfAbsent(storeName, new ConcurrentHashMap<Integer, Page>());
+         resultIndex = new ConcurrentHashMap<Integer, Page>();
+         ConcurrentMap<Integer, Page> mapResult = pageIndex.putIfAbsent(storeName, resultIndex);
+         if (mapResult != null)
+         {
+            resultIndex = mapResult;
+         }
       }
 
       return resultIndex;
@@ -365,6 +376,7 @@
       if (page == null)
       {
          page = pageManager.getPageStore(storeName).createPage(pageId);
+         page.open();
          map.put(pageId, page);
       }
 

Modified: branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-10-09 00:45:01 UTC (rev 8073)
+++ branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java	2009-10-09 02:45:05 UTC (rev 8074)
@@ -27,6 +27,7 @@
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
+import org.hornetq.core.buffers.ChannelBuffers;
 import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
 import org.hornetq.core.client.impl.ConnectionManager;
 import org.hornetq.core.client.impl.ConnectionManagerImpl;
@@ -40,6 +41,13 @@
 import org.hornetq.core.journal.RecordInfo;
 import org.hornetq.core.journal.TransactionFailureCallback;
 import org.hornetq.core.management.ManagementService;
+import org.hornetq.core.paging.PagedMessage;
+import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.paging.impl.PagedMessageImpl;
+import org.hornetq.core.paging.impl.PagingManagerImpl;
+import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
+import org.hornetq.core.persistence.StorageManager;
 import org.hornetq.core.remoting.Channel;
 import org.hornetq.core.remoting.ChannelHandler;
 import org.hornetq.core.remoting.Interceptor;
@@ -50,10 +58,15 @@
 import org.hornetq.core.replication.impl.ReplicatedJournal;
 import org.hornetq.core.replication.impl.ReplicationManagerImpl;
 import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.ServerMessage;
 import org.hornetq.core.server.impl.HornetQServerImpl;
+import org.hornetq.core.server.impl.ServerMessageImpl;
+import org.hornetq.core.settings.HierarchicalRepository;
+import org.hornetq.core.settings.impl.AddressSettings;
 import org.hornetq.tests.util.ServiceTestBase;
 import org.hornetq.utils.ExecutorFactory;
 import org.hornetq.utils.HornetQThreadFactory;
+import org.hornetq.utils.SimpleString;
 
 /**
  * A ReplicationTest
@@ -168,17 +181,8 @@
          replicatedJournal.appendPrepareRecord(3, new FakeData(), false);
          replicatedJournal.appendRollbackRecord(3, false);
 
-         final CountDownLatch latch = new CountDownLatch(1);
-         manager.afterReplicated(new Runnable()
-         {
+         blockOnReplication(manager);
 
-            public void run()
-            {
-               latch.countDown();
-            }
-
-         });
-         assertTrue(latch.await(1, TimeUnit.SECONDS));
          assertEquals(1, manager.getActiveTokens().size());
 
          manager.completeToken();
@@ -194,6 +198,47 @@
          }
 
          assertEquals(0, manager.getActiveTokens().size());
+
+         ServerMessage msg = new ServerMessageImpl();
+
+         SimpleString dummy = new SimpleString("dummy");
+         msg.setDestination(dummy);
+         msg.setBody(ChannelBuffers.wrappedBuffer(new byte[10]));
+
+         replicatedJournal.appendAddRecordTransactional(23, 24, (byte)1, new FakeData());
+
+         PagedMessage pgmsg = new PagedMessageImpl(msg, -1);
+         manager.pageWrite(pgmsg, 1);
+         manager.pageWrite(pgmsg, 2);
+         manager.pageWrite(pgmsg, 3);
+         manager.pageWrite(pgmsg, 4);
+
+         blockOnReplication(manager);
+
+         PagingManager pagingManager = createPageManager(server.getStorageManager(),
+                                                         server.getConfiguration(),
+                                                         server.getExecutorFactory(),
+                                                         server.getAddressSettingsRepository());
+         
+         PagingStore store = pagingManager.getPageStore(dummy);
+         store.start();
+         assertEquals(5, store.getNumberOfPages());
+         store.stop();
+         
+         manager.pageDeleted(dummy, 1);
+         manager.pageDeleted(dummy, 2);
+         manager.pageDeleted(dummy, 3);
+         manager.pageDeleted(dummy, 4);
+         manager.pageDeleted(dummy, 5);
+         manager.pageDeleted(dummy, 6);
+         
+
+         blockOnReplication(manager);
+         
+         store.start();
+         
+         assertEquals(0, store.getNumberOfPages());
+
          manager.stop();
       }
       finally
@@ -201,7 +246,27 @@
          server.stop();
       }
    }
-   
+
+   /**
+    * @param manager
+    * @return
+    */
+   private void blockOnReplication(ReplicationManagerImpl manager) throws Exception
+   {
+      final CountDownLatch latch = new CountDownLatch(1);
+      manager.afterReplicated(new Runnable()
+      {
+
+         public void run()
+         {
+            latch.countDown();
+         }
+
+      });
+      
+      assertTrue(latch.await(30, TimeUnit.SECONDS));
+   }
+
    public void testNoActions() throws Exception
    {
 
@@ -359,6 +424,22 @@
 
    }
 
+   protected PagingManager createPageManager(StorageManager storageManager,
+                                             Configuration configuration,
+                                             ExecutorFactory executorFactory,
+                                             HierarchicalRepository<AddressSettings> addressSettingsRepository) throws Exception
+   {
+
+      PagingManager paging = new PagingManagerImpl(new PagingStoreFactoryNIO(configuration.getPagingDirectory(),
+                                                                             executorFactory),
+                                                   storageManager,
+                                                   addressSettingsRepository,
+                                                   false);
+
+      paging.start();
+      return paging;
+   }
+
    // Private -------------------------------------------------------
 
    // Inner classes -------------------------------------------------



More information about the hornetq-commits mailing list