Author: clebert.suconic(a)jboss.com
Date: 2009-12-06 01:58:34 -0500 (Sun, 06 Dec 2009)
New Revision: 8586
Modified:
trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
trunk/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java
Log:
Fixed bug with duplicate detection on depage and transactional send
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2009-12-05 19:37:17
UTC (rev 8585)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2009-12-06 06:58:34
UTC (rev 8586)
@@ -49,6 +49,7 @@
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.TransactionPropertyIndexes;
+import org.hornetq.core.transaction.Transaction.State;
import org.hornetq.core.transaction.impl.TransactionImpl;
import org.hornetq.utils.SimpleString;
@@ -282,7 +283,7 @@
}
public void addSize(final ServerMessage message, final boolean add)
- {
+ {
long size = message.getMemoryEstimate();
if (add)
@@ -302,7 +303,7 @@
public void addSize(final MessageReference reference, final boolean add)
{
long size = MessageReferenceImpl.getMemoryEstimate();
-
+
if (add)
{
checkReleaseProducerFlowControlCredits(size);
@@ -400,9 +401,9 @@
if (running)
{
running = false;
-
+
final CountDownLatch latch = new CountDownLatch(1);
-
+
executor.execute(new Runnable()
{
public void run()
@@ -410,10 +411,10 @@
latch.countDown();
}
});
-
+
if (!latch.await(60, TimeUnit.SECONDS))
{
- log.warn("Timed out on waiting PagingStore " + this.address +
" to shutdown");
+ log.warn("Timed out on waiting PagingStore " + this.address +
" to shutdown");
}
if (currentPage != null)
@@ -726,7 +727,7 @@
}
}
- private void addSize(final long size)
+ private void addSize(final long size)
{
if (addressFullMessagePolicy != AddressFullMessagePolicy.PAGE)
{
@@ -948,55 +949,67 @@
final long transactionIdDuringPaging = pagedMessage.getTransactionID();
+ postOffice.route(message, depageTransaction);
+
+ // This means the page is duplicated. So we need to ignore this
+ if (depageTransaction.getState() == State.ROLLBACK_ONLY)
+ {
+ break;
+ }
+
+ PageTransactionInfo pageUserTransaction = null;
+
if (transactionIdDuringPaging >= 0)
{
- final PageTransactionInfo pageTransactionInfo =
pagingManager.getTransaction(transactionIdDuringPaging);
+ pageUserTransaction =
pagingManager.getTransaction(transactionIdDuringPaging);
- if (pageTransactionInfo == null)
+ if (pageUserTransaction == null)
{
- log.warn("Transaction " + pagedMessage.getTransactionID() +
- " used during paging not found, ignoring message " +
- message);
+ // This is not supposed to happen
+ log.warn("Transaction " + pagedMessage.getTransactionID() +
" used during paging not found");
continue;
}
+ else
+ {
- // This is to avoid a race condition where messages are depaged
- // before the commit arrived
+ // This is to avoid a race condition where messages are depaged
+ // before the commit arrived
- while (running && !pageTransactionInfo.waitCompletion(500))
- {
- // This is just to give us a chance to interrupt the process..
- // if we start a shutdown in the middle of transactions, the
commit/rollback may never come, delaying
- // the shutdown of the server
- if (isTrace)
+ while (running && !pageUserTransaction.waitCompletion(500))
{
- trace("Waiting pageTransaction to complete");
+ // This is just to give us a chance to interrupt the process..
+ // if we start a shutdown in the middle of transactions, the
commit/rollback may never come, delaying
+ // the shutdown of the server
+ if (isTrace)
+ {
+ trace("Waiting pageTransaction to complete");
+ }
}
- }
- if (!running)
- {
- break;
- }
+ if (!running)
+ {
+ break;
+ }
- if (!pageTransactionInfo.isCommit())
- {
- if (isTrace)
+ if (!pageUserTransaction.isCommit())
{
- trace("Rollback was called after prepare, ignoring message "
+ message);
+ if (isTrace)
+ {
+ trace("Rollback was called after prepare, ignoring message
" + message);
+ }
+ continue;
}
- continue;
}
- // Update information about transactions
- if (message.isDurable())
- {
- pageTransactionInfo.decrement();
- pageTransactionsToUpdate.add(pageTransactionInfo);
- }
}
- postOffice.route(message, depageTransaction);
+ // Update information about transactions
+ // This needs to be done after routing because of duplication detection
+ if (pageUserTransaction != null && message.isDurable())
+ {
+ pageUserTransaction.decrement();
+ pageTransactionsToUpdate.add(pageUserTransaction);
+ }
}
if (!running)
@@ -1023,7 +1036,7 @@
}
depageTransaction.commit();
-
+
storageManager.waitOnOperations();
if (isTrace)
Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
---
trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-12-05
19:37:17 UTC (rev 8585)
+++
trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-12-06
06:58:34 UTC (rev 8586)
@@ -95,6 +95,9 @@
private final ConcurrentMap<SimpleString, ConcurrentMap<Integer, Page>>
pageIndex = new ConcurrentHashMap<SimpleString, ConcurrentMap<Integer,
Page>>();
private final ConcurrentMap<Long, LargeServerMessage> largeMessages = new
ConcurrentHashMap<Long, LargeServerMessage>();
+
+ // Used on tests, to simulate failures on delete pages
+ private boolean deletePages = true;
// Constructors --------------------------------------------------
public ReplicationEndpointImpl(final HornetQServer server)
@@ -285,6 +288,12 @@
}
}
+
+ /** Used on tests only. To simulate missing page deletes*/
+ public void setDeletePages(final boolean deletePages)
+ {
+ this.deletePages = deletePages;
+ }
/**
* @param journalInformation
@@ -504,7 +513,10 @@
{
if (packet.isDelete())
{
- page.delete();
+ if (deletePages)
+ {
+ page.delete();
+ }
}
else
{
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-12-05 19:37:17
UTC (rev 8585)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-12-06 06:58:34
UTC (rev 8586)
@@ -854,6 +854,12 @@
// Public
//
---------------------------------------------------------------------------------------
+ /** For tests only */
+ public ReplicationEndpoint getReplicationEndpoint()
+ {
+ return this.replicationEndpoint;
+ }
+
// Package protected
// ----------------------------------------------------------------------------
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java 2009-12-05
19:37:17 UTC (rev 8585)
+++
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/PagingFailoverTest.java 2009-12-06
06:58:34 UTC (rev 8586)
@@ -17,7 +17,6 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
-import org.hornetq.core.buffers.HornetQBuffers;
import org.hornetq.core.client.ClientConsumer;
import org.hornetq.core.client.ClientMessage;
import org.hornetq.core.client.ClientProducer;
@@ -29,7 +28,9 @@
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.replication.impl.ReplicationEndpointImpl;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.impl.HornetQServerImpl;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.utils.SimpleString;
@@ -120,10 +121,20 @@
session.commit();
+ ReplicationEndpointImpl endpoint = null;
+
if (failBeforeConsume)
{
failSession(session, latch);
}
+ else
+ {
+ endpoint =
(ReplicationEndpointImpl)((HornetQServerImpl)server1Service).getReplicationEndpoint();
+ if (endpoint != null)
+ {
+ endpoint.setDeletePages(false);
+ }
+ }
session.start();
@@ -145,6 +156,11 @@
session.commit();
+ if (endpoint != null)
+ {
+ endpoint.setDeletePages(true);
+ }
+
if (!failBeforeConsume)
{
failSession(session, latch);
Modified: trunk/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java 2009-12-05
19:37:17 UTC (rev 8585)
+++ trunk/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java 2009-12-06
06:58:34 UTC (rev 8586)
@@ -69,18 +69,31 @@
public void testCrashDuringDeleteFile() throws Exception
{
- pageAndFail();
+ doTestCrashDuringDeleteFile(false);
+ }
+ public void testCrashDuringDeleteFileTransacted() throws Exception
+ {
+ doTestCrashDuringDeleteFile(true);
+ }
+
+ public void doTestCrashDuringDeleteFile(final boolean transacted) throws Exception
+ {
+ pageAndFail(transacted);
+
File pageDir = new File(getPageDir());
File directories[] = pageDir.listFiles();
assertEquals(1, directories.length);
- // When depage happened, a new empty page was supposed to be opened, what will
create 3 files
- assertEquals("Missing a file, supposed to have address.txt, 1st page and 2nd
page",
- 3,
- directories[0].list().length);
+ if (!transacted)
+ {
+ // When depage happened, a new empty page was supposed to be opened, what will
create 3 files
+ assertEquals("Missing a file, supposed to have address.txt, 1st page and
2nd page",
+ 3,
+ directories[0].list().length);
+ }
Configuration config = createDefaultConfig();
@@ -122,7 +135,7 @@
/** This method will leave garbage on paging.
* It will not delete page files as if the server crashed right after commit,
* and before removing the file*/
- private void pageAndFail() throws Exception
+ private void pageAndFail(final boolean transacted) throws Exception
{
clearData();
Configuration config = createDefaultConfig();
@@ -142,7 +155,7 @@
sf.setBlockOnPersistentSend(true);
sf.setBlockOnAcknowledge(true);
- ClientSession session = sf.createSession(null, null, false, true, true, false,
0);
+ ClientSession session = sf.createSession(null, null, false, !transacted,
!transacted, false, 0);
session.createQueue(ADDRESS, ADDRESS, null, true);
@@ -150,7 +163,7 @@
ClientMessage message = session.createClientMessage(true);
message.getBodyBuffer().writeBytes(new byte[1024]);
-
+
PagingStore store =
server.getPostOffice().getPagingManager().getPageStore(ADDRESS);
int messages = 0;
@@ -158,6 +171,10 @@
{
producer.send(message);
messages++;
+ if (transacted && messages % 100 == 0)
+ {
+ session.commit();
+ }
}
for (int i = 0; i < 2; i++)
@@ -166,6 +183,8 @@
producer.send(message);
}
+ session.commit();
+
session.close();
assertTrue(server.getPostOffice().getPagingManager().getTotalMemory() > 0);