[hornetq-commits] JBoss hornetq SVN: r8074 - in branches/Replication_Clebert: src/main/org/hornetq/core/persistence and 3 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Thu Oct 8 22:45:06 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-10-08 22:45:05 -0400 (Thu, 08 Oct 2009)
New Revision: 8074
Modified:
branches/Replication_Clebert/src/main/org/hornetq/core/paging/impl/PageImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/persistence/StorageManager.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
fixes
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/paging/impl/PageImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/paging/impl/PageImpl.java 2009-10-09 00:45:01 UTC (rev 8073)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/paging/impl/PageImpl.java 2009-10-09 02:45:05 UTC (rev 8074)
@@ -191,7 +191,10 @@
public void close() throws Exception
{
- storageManager.pageClosed(storeName, pageId);
+ if (storageManager != null)
+ {
+ storageManager.pageClosed(storeName, pageId);
+ }
file.close();
}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/persistence/StorageManager.java 2009-10-09 00:45:01 UTC (rev 8073)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/persistence/StorageManager.java 2009-10-09 02:45:05 UTC (rev 8074)
@@ -18,7 +18,6 @@
import javax.transaction.xa.Xid;
-import org.hornetq.core.buffers.ChannelBuffer;
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingManager;
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java 2009-10-09 00:45:01 UTC (rev 8073)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java 2009-10-09 02:45:05 UTC (rev 8074)
@@ -87,6 +87,8 @@
import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationPageEventMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationPageWriteMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationPrepareMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.RollbackMessage;
@@ -410,12 +412,12 @@
}
case REPLICATION_PAGE_WRITE:
{
- packet = new ReplicationResponseMessage();
+ packet = new ReplicationPageWriteMessage();
break;
}
case REPLICATION_PAGE_EVENT:
{
- packet = new ReplicationResponseMessage();
+ packet = new ReplicationPageEventMessage();
break;
}
default:
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-10-09 00:45:01 UTC (rev 8073)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-10-09 02:45:05 UTC (rev 8074)
@@ -299,6 +299,12 @@
ConcurrentMap<Integer, Page> pages = getPageMap(packet.getStoreName());
Page page = pages.remove(packet.getPageNumber());
+
+ if (page == null)
+ {
+ page = getPage(packet.getStoreName(), packet.getPageNumber());
+ }
+
if (page != null)
{
@@ -331,7 +337,12 @@
if (resultIndex == null)
{
- resultIndex = pageIndex.putIfAbsent(storeName, new ConcurrentHashMap<Integer, Page>());
+ resultIndex = new ConcurrentHashMap<Integer, Page>();
+ ConcurrentMap<Integer, Page> mapResult = pageIndex.putIfAbsent(storeName, resultIndex);
+ if (mapResult != null)
+ {
+ resultIndex = mapResult;
+ }
}
return resultIndex;
@@ -365,6 +376,7 @@
if (page == null)
{
page = pageManager.getPageStore(storeName).createPage(pageId);
+ page.open();
map.put(pageId, page);
}
Modified: branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-10-09 00:45:01 UTC (rev 8073)
+++ branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-10-09 02:45:05 UTC (rev 8074)
@@ -27,6 +27,7 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import org.hornetq.core.buffers.ChannelBuffers;
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.client.impl.ConnectionManager;
import org.hornetq.core.client.impl.ConnectionManagerImpl;
@@ -40,6 +41,13 @@
import org.hornetq.core.journal.RecordInfo;
import org.hornetq.core.journal.TransactionFailureCallback;
import org.hornetq.core.management.ManagementService;
+import org.hornetq.core.paging.PagedMessage;
+import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.paging.impl.PagedMessageImpl;
+import org.hornetq.core.paging.impl.PagingManagerImpl;
+import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
+import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.ChannelHandler;
import org.hornetq.core.remoting.Interceptor;
@@ -50,10 +58,15 @@
import org.hornetq.core.replication.impl.ReplicatedJournal;
import org.hornetq.core.replication.impl.ReplicationManagerImpl;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.HornetQServerImpl;
+import org.hornetq.core.server.impl.ServerMessageImpl;
+import org.hornetq.core.settings.HierarchicalRepository;
+import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.HornetQThreadFactory;
+import org.hornetq.utils.SimpleString;
/**
* A ReplicationTest
@@ -168,17 +181,8 @@
replicatedJournal.appendPrepareRecord(3, new FakeData(), false);
replicatedJournal.appendRollbackRecord(3, false);
- final CountDownLatch latch = new CountDownLatch(1);
- manager.afterReplicated(new Runnable()
- {
+ blockOnReplication(manager);
- public void run()
- {
- latch.countDown();
- }
-
- });
- assertTrue(latch.await(1, TimeUnit.SECONDS));
assertEquals(1, manager.getActiveTokens().size());
manager.completeToken();
@@ -194,6 +198,47 @@
}
assertEquals(0, manager.getActiveTokens().size());
+
+ ServerMessage msg = new ServerMessageImpl();
+
+ SimpleString dummy = new SimpleString("dummy");
+ msg.setDestination(dummy);
+ msg.setBody(ChannelBuffers.wrappedBuffer(new byte[10]));
+
+ replicatedJournal.appendAddRecordTransactional(23, 24, (byte)1, new FakeData());
+
+ PagedMessage pgmsg = new PagedMessageImpl(msg, -1);
+ manager.pageWrite(pgmsg, 1);
+ manager.pageWrite(pgmsg, 2);
+ manager.pageWrite(pgmsg, 3);
+ manager.pageWrite(pgmsg, 4);
+
+ blockOnReplication(manager);
+
+ PagingManager pagingManager = createPageManager(server.getStorageManager(),
+ server.getConfiguration(),
+ server.getExecutorFactory(),
+ server.getAddressSettingsRepository());
+
+ PagingStore store = pagingManager.getPageStore(dummy);
+ store.start();
+ assertEquals(5, store.getNumberOfPages());
+ store.stop();
+
+ manager.pageDeleted(dummy, 1);
+ manager.pageDeleted(dummy, 2);
+ manager.pageDeleted(dummy, 3);
+ manager.pageDeleted(dummy, 4);
+ manager.pageDeleted(dummy, 5);
+ manager.pageDeleted(dummy, 6);
+
+
+ blockOnReplication(manager);
+
+ store.start();
+
+ assertEquals(0, store.getNumberOfPages());
+
manager.stop();
}
finally
@@ -201,7 +246,27 @@
server.stop();
}
}
-
+
+ /**
+ * @param manager
+ * @return
+ */
+ private void blockOnReplication(ReplicationManagerImpl manager) throws Exception
+ {
+ final CountDownLatch latch = new CountDownLatch(1);
+ manager.afterReplicated(new Runnable()
+ {
+
+ public void run()
+ {
+ latch.countDown();
+ }
+
+ });
+
+ assertTrue(latch.await(30, TimeUnit.SECONDS));
+ }
+
public void testNoActions() throws Exception
{
@@ -359,6 +424,22 @@
}
+ protected PagingManager createPageManager(StorageManager storageManager,
+ Configuration configuration,
+ ExecutorFactory executorFactory,
+ HierarchicalRepository<AddressSettings> addressSettingsRepository) throws Exception
+ {
+
+ PagingManager paging = new PagingManagerImpl(new PagingStoreFactoryNIO(configuration.getPagingDirectory(),
+ executorFactory),
+ storageManager,
+ addressSettingsRepository,
+ false);
+
+ paging.start();
+ return paging;
+ }
+
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
More information about the hornetq-commits
mailing list