[hornetq-commits] JBoss hornetq SVN: r10366 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/persistence/impl/journal and 3 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Fri Mar 25 15:31:43 EDT 2011
Author: clebert.suconic at jboss.com
Date: 2011-03-25 15:31:43 -0400 (Fri, 25 Mar 2011)
New Revision: 10366
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/MessageReference.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/LastValueQueue.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/RedeliveryConsumerTest.java
Log:
https://issues.jboss.org/browse/JBPAPP-6170 - Redelivery counters infinite updates
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java 2011-03-25 13:17:31 UTC (rev 10365)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/paging/cursor/PagedReferenceImpl.java 2011-03-25 19:31:43 UTC (rev 10366)
@@ -37,9 +37,11 @@
private final PagePosition position;
private WeakReference<PagedMessage> message;
-
+
private Long deliveryTime = null;
-
+
+ private int persistedCount;
+
private final PageSubscription subscription;
public ServerMessage getMessage()
@@ -50,10 +52,10 @@
public synchronized PagedMessage getPagedMessage()
{
PagedMessage returnMessage = message != null ? message.get() : null;
-
+
// We only keep a few references on the Queue from paging...
// Besides those references are SoftReferenced on page cache...
- // So, this will unlikely be null,
+ // So, this will unlikely be null,
// unless the Queue has stalled for some time after paging
if (returnMessage == null)
{
@@ -69,7 +71,9 @@
return position;
}
- public PagedReferenceImpl(final PagePosition position, final PagedMessage message, final PageSubscription subscription)
+ public PagedReferenceImpl(final PagePosition position,
+ final PagedMessage message,
+ final PageSubscription subscription)
{
this.position = position;
this.message = new WeakReference<PagedMessage>(message);
@@ -80,6 +84,16 @@
{
return true;
}
+
+ public void setPersistedCount(int count)
+ {
+ this.persistedCount = count;
+ }
+
+ public int getPersistedCount()
+ {
+ return persistedCount;
+ }
/* (non-Javadoc)
* @see org.hornetq.core.server.MessageReference#copy(org.hornetq.core.server.Queue)
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-03-25 13:17:31 UTC (rev 10365)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-03-25 19:31:43 UTC (rev 10366)
@@ -744,16 +744,22 @@
public void updateDeliveryCount(final MessageReference ref) throws Exception
{
- DeliveryCountUpdateEncoding updateInfo = new DeliveryCountUpdateEncoding(ref.getQueue().getID(),
- ref.getDeliveryCount());
+ // no need to store if it's the same value
+ // otherwise the journal will get OME in case of lots of redeliveries
+ if (ref.getDeliveryCount() != ref.getPersistedCount())
+ {
+ ref.setPersistedCount(ref.getDeliveryCount());
+ DeliveryCountUpdateEncoding updateInfo = new DeliveryCountUpdateEncoding(ref.getQueue().getID(),
+ ref.getDeliveryCount());
+
+ messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
+ JournalStorageManager.UPDATE_DELIVERY_COUNT,
+ updateInfo,
+
+ syncNonTransactional,
+ getContext(syncNonTransactional));
+ }
- messageJournal.appendUpdateRecord(ref.getMessage().getMessageID(),
- JournalStorageManager.UPDATE_DELIVERY_COUNT,
- updateInfo,
-
- syncNonTransactional,
- getContext(syncNonTransactional));
-
}
public void storeAddressSetting(PersistedAddressSetting addressSetting) throws Exception
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/MessageReference.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/MessageReference.java 2011-03-25 13:17:31 UTC (rev 10365)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/MessageReference.java 2011-03-25 19:31:43 UTC (rev 10366)
@@ -47,6 +47,10 @@
int getDeliveryCount();
void setDeliveryCount(int deliveryCount);
+
+ void setPersistedCount(int deliveryCount);
+
+ int getPersistedCount();
void incrementDeliveryCount();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-03-25 13:17:31 UTC (rev 10365)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-03-25 19:31:43 UTC (rev 10366)
@@ -645,18 +645,14 @@
}
- OperationContext formerCtx = null;
// We close all the exception in an attempt to let any pending IO to finish
// to avoid scenarios where the send or ACK got to disk but the response didn't get to the client
// It may still be possible to have this scenario on a real failure (without the use of XA)
// But at least we will do our best to avoid it on regular shutdowns
for (ServerSession session : sessions.values())
{
- storageManager.setContext(session.getSessionContext());
session.close(true);
}
-
- storageManager.setContext(formerCtx);
remotingService.stop();
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/LastValueQueue.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2011-03-25 13:17:31 UTC (rev 10365)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2011-03-25 19:31:43 UTC (rev 10366)
@@ -230,6 +230,16 @@
ref.setScheduledDeliveryTime(scheduledDeliveryTime);
}
+ public void setPersistedCount(int count)
+ {
+ ref.setPersistedCount(count);
+ }
+
+ public int getPersistedCount()
+ {
+ return ref.getPersistedCount();
+ }
+
public boolean isPaged()
{
return false;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java 2011-03-25 13:17:31 UTC (rev 10365)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java 2011-03-25 19:31:43 UTC (rev 10366)
@@ -36,6 +36,8 @@
private volatile int deliveryCount;
+ private volatile int persistedCount;
+
private volatile long scheduledDeliveryTime;
private final ServerMessage message;
@@ -91,6 +93,23 @@
}
// MessageReference implementation -------------------------------
+
+ /**
+ * @return the persistedCount
+ */
+ public int getPersistedCount()
+ {
+ return persistedCount;
+ }
+
+ /**
+ * @param persistedCount the persistedCount to set
+ */
+ public void setPersistedCount(int persistedCount)
+ {
+ this.persistedCount = persistedCount;
+ }
+
public MessageReference copy(final Queue queue)
{
return new MessageReferenceImpl(this, queue);
@@ -109,6 +128,7 @@
public void setDeliveryCount(final int deliveryCount)
{
this.deliveryCount = deliveryCount;
+ this.persistedCount = deliveryCount;
}
public void incrementDeliveryCount()
@@ -145,7 +165,7 @@
{
queue.referenceHandled();
}
-
+
public boolean isPaged()
{
return false;
@@ -167,7 +187,6 @@
queue.acknowledge(tx, this);
}
-
// Public --------------------------------------------------------
@Override
@@ -175,7 +194,9 @@
{
return "Reference[" + getMessage().getMessageID() +
"]:" +
- (getMessage().isDurable() ? "RELIABLE" : "NON-RELIABLE") + ":" + getMessage() ;
+ (getMessage().isDurable() ? "RELIABLE" : "NON-RELIABLE") +
+ ":" +
+ getMessage();
}
// Package protected ---------------------------------------------
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-03-25 13:17:31 UTC (rev 10365)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2011-03-25 19:31:43 UTC (rev 10366)
@@ -953,24 +953,35 @@
public void close(final boolean failed)
{
- storageManager.afterCompleteOperations(new IOAsyncTask()
+ OperationContext formerCtx = storageManager.getContext();
+
+ try
{
- public void onError(int errorCode, String errorMessage)
- {
- }
+ storageManager.setContext(sessionContext);
- public void done()
+ storageManager.afterCompleteOperations(new IOAsyncTask()
{
- try
+ public void onError(int errorCode, String errorMessage)
{
- doClose(failed);
}
- catch (Exception e)
+
+ public void done()
{
- log.error("Failed to close session", e);
+ try
+ {
+ doClose(failed);
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to close session", e);
+ }
}
- }
- });
+ });
+ }
+ finally
+ {
+ storageManager.setContext(formerCtx);
+ }
}
public void closeConsumer(final long consumerID) throws Exception
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/RedeliveryConsumerTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/RedeliveryConsumerTest.java 2011-03-25 13:17:31 UTC (rev 10365)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/RedeliveryConsumerTest.java 2011-03-25 19:31:43 UTC (rev 10366)
@@ -13,13 +13,27 @@
package org.hornetq.tests.integration.client;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
import junit.framework.Assert;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.config.Configuration;
+import org.hornetq.core.journal.LoaderCallback;
+import org.hornetq.core.journal.PreparedTransactionInfo;
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.impl.JournalImpl;
+import org.hornetq.core.journal.impl.NIOSequentialFileFactory;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.tests.util.ServiceTestBase;
@@ -223,6 +237,111 @@
session.close();
}
+
+ public void testInfiniteDedeliveryMessageOnPersistent() throws Exception
+ {
+ internaltestInfiniteDedeliveryMessageOnPersistent(false);
+ }
+
+ private void internaltestInfiniteDedeliveryMessageOnPersistent(final boolean strict) throws Exception
+ {
+ setUp(strict);
+ ClientSession session = factory.createSession(false, false, false);
+
+ RedeliveryConsumerTest.log.info("created");
+
+ ClientProducer prod = session.createProducer(ADDRESS);
+ prod.send(createTextMessage(session, "Hello"));
+ session.commit();
+ session.close();
+
+
+ int expectedCount = 1;
+ for (int i = 0 ; i < 700; i++)
+ {
+ session = factory.createSession(false, false, false);
+ session.start();
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+ ClientMessage msg = consumer.receive(5000);
+ assertNotNull(msg);
+ assertEquals(expectedCount, msg.getDeliveryCount());
+
+ if (i % 100 == 0)
+ {
+ expectedCount++;
+ msg.acknowledge();
+ session.rollback();
+ }
+ session.close();
+ }
+
+ factory.close();
+ server.stop();
+
+ setUp(false);
+
+ for (int i = 0 ; i < 700; i++)
+ {
+ session = factory.createSession(false, false, false);
+ session.start();
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+ ClientMessage msg = consumer.receive(5000);
+ assertNotNull(msg);
+ assertEquals(expectedCount, msg.getDeliveryCount());
+ session.close();
+ }
+
+ server.stop();
+
+
+ JournalImpl journal = new JournalImpl(server.getConfiguration().getJournalFileSize(),
+ 2,
+ 0,
+ 0,
+ new NIOSequentialFileFactory(server.getConfiguration().getJournalDirectory()),
+ "hornetq-data",
+ "hq",
+ 1);
+
+
+ final AtomicInteger updates = new AtomicInteger();
+
+ journal.start();
+ journal.load(new LoaderCallback()
+ {
+
+ public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete)
+ {
+ }
+
+ public void updateRecord(RecordInfo info)
+ {
+ if (info.userRecordType == JournalStorageManager.UPDATE_DELIVERY_COUNT)
+ {
+ updates.incrementAndGet();
+ }
+ }
+
+ public void deleteRecord(long id)
+ {
+ }
+
+ public void addRecord(RecordInfo info)
+ {
+ }
+
+ public void addPreparedTransaction(PreparedTransactionInfo preparedTransaction)
+ {
+ }
+ });
+
+ journal.stop();
+
+
+ assertEquals(7, updates.get());
+
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
@@ -244,7 +363,14 @@
factory = locator.createSessionFactory();
ClientSession session = factory.createSession(false, false, false);
- session.createQueue(ADDRESS, ADDRESS, true);
+ try
+ {
+ session.createQueue(ADDRESS, ADDRESS, true);
+ }
+ catch (HornetQException expected)
+ {
+ // in case of restart
+ }
session.close();
}
More information about the hornetq-commits
mailing list