[hornetq-commits] JBoss hornetq SVN: r9607 - in branches/Branch_2_1: src/main/org/hornetq/core/paging and 8 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Sun Aug 29 16:01:45 EDT 2010
Author: clebert.suconic at jboss.com
Date: 2010-08-29 16:01:42 -0400 (Sun, 29 Aug 2010)
New Revision: 9607
Modified:
branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java
branches/Branch_2_1/src/main/org/hornetq/core/paging/PageTransactionInfo.java
branches/Branch_2_1/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
branches/Branch_2_1/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Branch_2_1/src/main/org/hornetq/core/persistence/StorageManager.java
branches/Branch_2_1/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_2_1/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/Branch_2_1/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/Branch_2_1/tests/src/org/hornetq/tests/integration/client/PagingTest.java
branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java
branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-497 & https://jira.jboss.org/browse/HORNETQ-485 - Fixes on Paging and Journal
Modified: branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-27 16:44:17 UTC (rev 9606)
+++ branches/Branch_2_1/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-08-29 20:01:42 UTC (rev 9607)
@@ -108,7 +108,7 @@
private static final void traceRecord(final String message)
{
- System.out.println(message);
+ JournalImpl.log.trace(message);
}
// The sizes of primitive types
@@ -838,7 +838,11 @@
if (JournalImpl.TRACE_RECORDS)
{
- JournalImpl.traceRecord("appendAddRecord::id=" + id + ", usedFile = " + usedFile);
+ JournalImpl.traceRecord("appendAddRecord::id=" + id +
+ ", userRecordType=" +
+ recordType +
+ ", usedFile = " +
+ usedFile);
}
records.put(id, new JournalRecord(usedFile, addRecord.getEncodeSize()));
@@ -919,7 +923,11 @@
if (JournalImpl.TRACE_RECORDS)
{
- JournalImpl.traceRecord("appendUpdateRecord::id=" + id + ", usedFile = " + usedFile);
+ JournalImpl.traceRecord("appendUpdateRecord::id=" + id +
+ ", userRecordType=" +
+ recordType +
+ ", usedFile = " +
+ usedFile);
}
// record== null here could only mean there is a compactor, and computing the delete should be done after
@@ -1060,6 +1068,8 @@
JournalImpl.traceRecord("appendAddRecordTransactional:txID=" + txID +
",id=" +
id +
+ ", userRecordType=" +
+ recordType +
", usedFile = " +
usedFile);
}
@@ -1113,6 +1123,8 @@
JournalImpl.traceRecord("appendUpdateRecordTransactional::txID=" + txID +
",id=" +
id +
+ ", userRecordType=" +
+ recordType +
", usedFile = " +
usedFile);
}
@@ -2105,6 +2117,8 @@
filesRepository.pushOpenedFile();
+ state = JournalImpl.STATE_LOADED;
+
for (TransactionHolder transaction : loadTransactions.values())
{
if (!transaction.prepared || transaction.invalid)
@@ -2112,19 +2126,9 @@
JournalImpl.log.warn("Uncommitted transaction with id " + transaction.transactionID +
" found and discarded");
- JournalTransaction transactionInfo = transactions.get(transaction.transactionID);
+ // I append a rollback record here, because otherwise compacting will be throwing messages because of unknown transactions
+ this.appendRollbackRecord(transaction.transactionID, false);
- if (transactionInfo == null)
- {
- throw new IllegalStateException("Cannot find tx " + transaction.transactionID);
- }
-
- // Reverse the refs
- transactionInfo.forget();
-
- // Remove the transactionInfo
- transactions.remove(transaction.transactionID);
-
loadManager.failedTransaction(transaction.transactionID,
transaction.recordInfos,
transaction.recordsToDelete);
@@ -2149,8 +2153,6 @@
}
}
- state = JournalImpl.STATE_LOADED;
-
checkReclaimStatus();
return new JournalLoadInformation(records.size(), maxID.longValue());
Modified: branches/Branch_2_1/src/main/org/hornetq/core/paging/PageTransactionInfo.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/paging/PageTransactionInfo.java 2010-08-27 16:44:17 UTC (rev 9606)
+++ branches/Branch_2_1/src/main/org/hornetq/core/paging/PageTransactionInfo.java 2010-08-29 20:01:42 UTC (rev 9607)
@@ -14,6 +14,8 @@
package org.hornetq.core.paging;
import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.transaction.Transaction;
/**
*
@@ -37,10 +39,15 @@
void setRecordID(long id);
long getTransactionID();
+
+ void store(StorageManager storageManager,Transaction tx) throws Exception;
+
+ void storeUpdate(StorageManager storageManager, Transaction tx, int depages) throws Exception;
- int increment();
+ // To be used after the update was stored or reload
+ void update(int update, StorageManager storageManager);
- int decrement();
+ void increment();
int getNumberOfMessages();
Modified: branches/Branch_2_1/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2010-08-27 16:44:17 UTC (rev 9606)
+++ branches/Branch_2_1/src/main/org/hornetq/core/paging/impl/PageTransactionInfoImpl.java 2010-08-29 20:01:42 UTC (rev 9607)
@@ -18,7 +18,11 @@
import java.util.concurrent.atomic.AtomicInteger;
import org.hornetq.api.core.HornetQBuffer;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PageTransactionInfo;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.core.transaction.Transaction;
+import org.hornetq.core.transaction.TransactionOperation;
import org.hornetq.utils.DataConstants;
/**
@@ -30,11 +34,13 @@
{
// Constants -----------------------------------------------------
+ private static final Logger log = Logger.getLogger(PageTransactionInfoImpl.class);
+
// Attributes ----------------------------------------------------
private long transactionID;
- private volatile long recordID;
+ private volatile long recordID = -1;
private volatile CountDownLatch countDownCompleted;
@@ -42,7 +48,7 @@
private volatile boolean rolledback;
- private final AtomicInteger numberOfMessages = new AtomicInteger(0);
+ private AtomicInteger numberOfMessages = new AtomicInteger(0);
// Static --------------------------------------------------------
@@ -75,21 +81,25 @@
return transactionID;
}
- public int increment()
+ public void update(final int update, final StorageManager storageManager)
{
- return numberOfMessages.incrementAndGet();
+ int sizeAfterUpdate = numberOfMessages.addAndGet(-update);
+ if (sizeAfterUpdate == 0 && storageManager != null)
+ {
+ try
+ {
+ storageManager.deletePageTransactional(this.recordID);
+ }
+ catch (Exception e)
+ {
+ log.warn("Can't delete page transaction id=" + this.recordID);
+ }
+ }
}
- public int decrement()
+ public void increment()
{
- final int value = numberOfMessages.decrementAndGet();
-
- if (value < 0)
- {
- throw new IllegalStateException("Internal error Negative value on Paging transactions!");
- }
-
- return value;
+ numberOfMessages.incrementAndGet();
}
public int getNumberOfMessages()
@@ -103,10 +113,8 @@
{
transactionID = buffer.readLong();
numberOfMessages.set(buffer.readInt());
- countDownCompleted = null; // if it is being readed, probably it was
- // committed
- committed = true; // Unless it is a incomplete prepare, which is marked by
- // markIcomplete
+ countDownCompleted = null;
+ committed = true;
}
public synchronized void encode(final HornetQBuffer buffer)
@@ -141,6 +149,49 @@
}
}
+ public void store(final StorageManager storageManager, final Transaction tx) throws Exception
+ {
+ storageManager.storePageTransaction(tx.getID(), this);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.paging.PageTransactionInfo#storeUpdate(org.hornetq.core.persistence.StorageManager, org.hornetq.core.transaction.Transaction, int)
+ */
+ public void storeUpdate(final StorageManager storageManager, final Transaction tx, final int depages) throws Exception
+ {
+ storageManager.updatePageTransaction(tx.getID(), this, depages);
+
+ final PageTransactionInfo pgToUpdate = this;
+
+ tx.addOperation(new TransactionOperation()
+ {
+ public void beforeRollback(Transaction tx) throws Exception
+ {
+ }
+
+ public void beforePrepare(Transaction tx) throws Exception
+ {
+ }
+
+ public void beforeCommit(Transaction tx) throws Exception
+ {
+ }
+
+ public void afterRollback(Transaction tx)
+ {
+ }
+
+ public void afterPrepare(Transaction tx)
+ {
+ }
+
+ public void afterCommit(Transaction tx)
+ {
+ pgToUpdate.update(depages, storageManager);
+ }
+ });
+ }
+
public boolean isCommit()
{
return committed;
@@ -166,6 +217,16 @@
countDownCompleted = new CountDownLatch(1);
}
+ public String toString()
+ {
+ return "PageTransactionInfoImpl(transactionID=" + transactionID +
+ ",id=" +
+ recordID +
+ ",numberOfMessages=" +
+ numberOfMessages +
+ ")";
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/Branch_2_1/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-08-27 16:44:17 UTC (rev 9606)
+++ branches/Branch_2_1/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2010-08-29 20:01:42 UTC (rev 9607)
@@ -14,8 +14,9 @@
package org.hornetq.core.paging.impl;
import java.text.DecimalFormat;
-import java.util.HashSet;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
@@ -958,7 +959,7 @@
depageTransaction.putProperty(TransactionPropertyIndexes.IS_DEPAGE, Boolean.valueOf(true));
- HashSet<PageTransactionInfo> pageTransactionsToUpdate = new HashSet<PageTransactionInfo>();
+ HashMap<PageTransactionInfo, AtomicInteger> pageTransactionsToUpdate = new HashMap<PageTransactionInfo, AtomicInteger>();
for (PagedMessage pagedMessage : pagedMessages)
{
@@ -978,6 +979,7 @@
final long transactionIdDuringPaging = pagedMessage.getTransactionID();
PageTransactionInfo pageUserTransaction = null;
+ AtomicInteger countPageTX = null;
if (transactionIdDuringPaging >= 0)
{
@@ -992,6 +994,12 @@
}
else
{
+ countPageTX = pageTransactionsToUpdate.get(pageUserTransaction);
+ if (countPageTX == null)
+ {
+ countPageTX = new AtomicInteger();
+ pageTransactionsToUpdate.put(pageUserTransaction, countPageTX);
+ }
// This is to avoid a race condition where messages are depaged
// before the commit arrived
@@ -1036,8 +1044,7 @@
// This needs to be done after routing because of duplication detection
if (pageUserTransaction != null && message.isDurable())
{
- pageUserTransaction.decrement();
- pageTransactionsToUpdate.add(pageUserTransaction);
+ countPageTX.incrementAndGet();
}
}
@@ -1047,21 +1054,12 @@
return false;
}
- for (PageTransactionInfo pageWithTransaction : pageTransactionsToUpdate)
+ for (Map.Entry<PageTransactionInfo, AtomicInteger> entry : pageTransactionsToUpdate.entrySet())
{
// This will set the journal transaction to commit;
depageTransaction.setContainsPersistent();
-
- if (pageWithTransaction.getNumberOfMessages() == 0)
- {
- // numberOfReads==numberOfWrites -> We delete the record
- storageManager.deletePageTransactional(depageTransaction.getID(), pageWithTransaction.getRecordID());
- pagingManager.removeTransaction(pageWithTransaction.getTransactionID());
- }
- else
- {
- storageManager.storePageTransaction(depageTransaction.getID(), pageWithTransaction);
- }
+
+ entry.getKey().storeUpdate(storageManager, depageTransaction, entry.getValue().intValue());
}
depageTransaction.commit();
Modified: branches/Branch_2_1/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/persistence/StorageManager.java 2010-08-27 16:44:17 UTC (rev 9606)
+++ branches/Branch_2_1/src/main/org/hornetq/core/persistence/StorageManager.java 2010-08-29 20:01:42 UTC (rev 9607)
@@ -134,8 +134,10 @@
void rollback(long txID) throws Exception;
void storePageTransaction(long txID, PageTransactionInfo pageTransaction) throws Exception;
+
+ void updatePageTransaction(long txID, PageTransactionInfo pageTransaction, int depage) throws Exception;
- void deletePageTransactional(long txID, long recordID) throws Exception;
+ void deletePageTransactional(long recordID) throws Exception;
/** This method is only useful at the backup side. We only load internal structures making the journals ready for
* append mode on the backup side. */
Modified: branches/Branch_2_1/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-08-27 16:44:17 UTC (rev 9606)
+++ branches/Branch_2_1/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2010-08-29 20:01:42 UTC (rev 9607)
@@ -575,13 +575,6 @@
public void storePageTransaction(final long txID, final PageTransactionInfo pageTransaction) throws Exception
{
- if (pageTransaction.getRecordID() != 0)
- {
- // Instead of updating the record, we delete the old one as that is
- // better for reclaiming
- messageJournal.appendDeleteRecordTransactional(txID, pageTransaction.getRecordID());
- }
-
pageTransaction.setRecordID(generateUniqueID());
messageJournal.appendAddRecordTransactional(txID,
@@ -590,6 +583,11 @@
pageTransaction);
}
+ public void updatePageTransaction(final long txID, final PageTransactionInfo pageTransaction, final int depages) throws Exception
+ {
+ messageJournal.appendUpdateRecordTransactional(txID, pageTransaction.getRecordID(), JournalStorageManager.PAGE_TRANSACTION, new PageUpdateTXEncoding(pageTransaction.getTransactionID(), depages));
+ }
+
public void storeReferenceTransactional(final long txID, final long queueID, final long messageID) throws Exception
{
messageJournal.appendUpdateRecordTransactional(txID,
@@ -623,9 +621,9 @@
messageJournal.appendDeleteRecord(id, true, getContext(true));
}
- public void deletePageTransactional(final long txID, final long recordID) throws Exception
+ public void deletePageTransactional(final long recordID) throws Exception
{
- messageJournal.appendDeleteRecordTransactional(txID, recordID);
+ messageJournal.appendDeleteRecord(recordID, false);
}
public void updateScheduledDeliveryTimeTransactional(final long txID, final MessageReference ref) throws Exception
@@ -907,14 +905,27 @@
}
case PAGE_TRANSACTION:
{
- PageTransactionInfoImpl pageTransactionInfo = new PageTransactionInfoImpl();
+ if (record.isUpdate)
+ {
+ PageUpdateTXEncoding pageUpdate = new PageUpdateTXEncoding();
+
+ pageUpdate.decode(buff);
+
+ PageTransactionInfo pageTX = pagingManager.getTransaction(pageUpdate.pageTX);
+
+ pageTX.update(pageUpdate.recods, null);
+ }
+ else
+ {
+ PageTransactionInfoImpl pageTransactionInfo = new PageTransactionInfoImpl();
+
+ pageTransactionInfo.decode(buff);
+
+ pageTransactionInfo.setRecordID(record.id);
+
+ pagingManager.addTransaction(pageTransactionInfo);
+ }
- pageTransactionInfo.decode(buff);
-
- pageTransactionInfo.setRecordID(record.id);
-
- pagingManager.addTransaction(pageTransactionInfo);
-
break;
}
case SET_SCHEDULED_DELIVERY_TIME:
@@ -2007,7 +2018,49 @@
super(queueID);
}
}
+
+ private static class PageUpdateTXEncoding implements EncodingSupport
+ {
+
+ public long pageTX;
+
+ public int recods;
+
+ public PageUpdateTXEncoding()
+ {
+ }
+
+ public PageUpdateTXEncoding(final long pageTX, final int records)
+ {
+ this.pageTX = pageTX;
+ this.recods = records;
+ }
+
+ public void decode(HornetQBuffer buffer)
+ {
+ this.pageTX = buffer.readLong();
+ this.recods = buffer.readInt();
+ }
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#encode(org.hornetq.api.core.HornetQBuffer)
+ */
+ public void encode(HornetQBuffer buffer)
+ {
+ buffer.writeLong(pageTX);
+ buffer.writeInt(recods);
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.EncodingSupport#getEncodeSize()
+ */
+ public int getEncodeSize()
+ {
+ return DataConstants.SIZE_LONG + DataConstants.SIZE_INT;
+ }
+
+ }
+
private static class ScheduledDeliveryEncoding extends QueueEncoding
{
long scheduledDeliveryTime;
Modified: branches/Branch_2_1/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2010-08-27 16:44:17 UTC (rev 9606)
+++ branches/Branch_2_1/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2010-08-29 20:01:42 UTC (rev 9607)
@@ -436,4 +436,18 @@
{
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#deletePageTransactional(long)
+ */
+ public void deletePageTransactional(long recordID) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#updatePageTransaction(long, org.hornetq.core.paging.PageTransactionInfo, int)
+ */
+ public void updatePageTransaction(long txID, PageTransactionInfo pageTransaction, int depage) throws Exception
+ {
+ }
+
}
Modified: branches/Branch_2_1/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_2_1/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-08-27 16:44:17 UTC (rev 9606)
+++ branches/Branch_2_1/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-08-29 20:01:42 UTC (rev 9607)
@@ -58,9 +58,9 @@
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.transaction.Transaction;
-import org.hornetq.core.transaction.Transaction.State;
import org.hornetq.core.transaction.TransactionOperation;
import org.hornetq.core.transaction.TransactionPropertyIndexes;
+import org.hornetq.core.transaction.Transaction.State;
import org.hornetq.core.transaction.impl.TransactionImpl;
import org.hornetq.utils.TypedProperties;
import org.hornetq.utils.UUIDGenerator;
@@ -1242,7 +1242,7 @@
store.sync();
}
- storageManager.storePageTransaction(tx.getID(), pageTransaction);
+ pageTransaction.store(storageManager, tx);
}
}
}
Modified: branches/Branch_2_1/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-08-27 16:44:17 UTC (rev 9606)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-08-29 20:01:42 UTC (rev 9607)
@@ -14,8 +14,10 @@
package org.hornetq.tests.integration.client;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
import junit.framework.AssertionFailedError;
@@ -30,6 +32,7 @@
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.MessageHandler;
import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.DivertConfiguration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.impl.TestSupportPageStore;
import org.hornetq.core.server.HornetQServer;
@@ -38,7 +41,6 @@
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.tests.util.UnitTestCase;
-import org.hornetq.utils.DataConstants;
/**
* A PagingTest
@@ -86,12 +88,192 @@
internaltestSendReceivePaging(true);
}
-
public void testSendReceivePagingNonPersistent() throws Exception
{
internaltestSendReceivePaging(false);
}
+ public void testWithDiverts() throws Exception
+ {
+ clearData();
+
+ Configuration config = createDefaultConfig();
+
+ HornetQServer server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+
+ DivertConfiguration divert1 = new DivertConfiguration("dv1",
+ "nm1",
+ PagingTest.ADDRESS.toString(),
+ PagingTest.ADDRESS.toString() + "-1",
+ true,
+ null,
+ null);
+
+ DivertConfiguration divert2 = new DivertConfiguration("dv2",
+ "nm2",
+ PagingTest.ADDRESS.toString(),
+ PagingTest.ADDRESS.toString() + "-2",
+ true,
+ null,
+ null);
+
+ ArrayList<DivertConfiguration> divertList = new ArrayList<DivertConfiguration>();
+ divertList.add(divert1);
+ divertList.add(divert2);
+
+ config.setDivertConfigurations(divertList);
+
+ server.start();
+
+ final int numberOfIntegers = 256;
+
+ final int numberOfMessages = 30000;
+
+ final byte[] body = new byte[numberOfIntegers * 4];
+
+ ByteBuffer bb = ByteBuffer.wrap(body);
+
+ for (int j = 1; j <= numberOfIntegers; j++)
+ {
+ bb.putInt(j);
+ }
+
+ try
+ {
+ {
+ ClientSessionFactory sf = createInVMFactory();
+
+ sf.setBlockOnNonDurableSend(true);
+ sf.setBlockOnDurableSend(true);
+ sf.setBlockOnAcknowledge(true);
+
+ ClientSession session = sf.createSession(false, false, false);
+
+ session.createQueue(PagingTest.ADDRESS + "-1", PagingTest.ADDRESS + "-1", null, true);
+
+ session.createQueue(PagingTest.ADDRESS + "-2", PagingTest.ADDRESS + "-2", null, true);
+
+ ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
+
+ ClientMessage message = null;
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ message = session.createMessage(true);
+
+ HornetQBuffer bodyLocal = message.getBodyBuffer();
+
+ bodyLocal.writeBytes(body);
+
+ message.putIntProperty(new SimpleString("id"), i);
+
+ producer.send(message);
+ }
+
+ session.commit();
+
+ session.close();
+
+ server.stop();
+ }
+
+ server = createServer(true,
+ config,
+ PagingTest.PAGE_SIZE,
+ PagingTest.PAGE_MAX,
+ new HashMap<String, AddressSettings>());
+ server.start();
+
+ final ClientSessionFactory sf2 = createInVMFactory();
+
+ final AtomicInteger errors = new AtomicInteger(0);
+
+ Thread threads[] = new Thread[2];
+
+ for (int start = 1; start <= 2; start++)
+ {
+
+ final String addressToSubscribe = PagingTest.ADDRESS + "-" + start;
+
+ threads[start - 1] = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ ClientSession session = sf2.createSession(null, null, false, true, true, false, 0);
+
+ ClientConsumer consumer = session.createConsumer(addressToSubscribe);
+
+ session.start();
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage message2 = consumer.receive(PagingTest.RECEIVE_TIMEOUT);
+
+ Assert.assertNotNull(message2);
+
+ Assert.assertEquals(i, message2.getIntProperty("id").intValue());
+
+ message2.acknowledge();
+
+ Assert.assertNotNull(message2);
+
+ session.commit();
+
+ try
+ {
+ assertBodiesEqual(body, message2.getBodyBuffer());
+ }
+ catch (AssertionFailedError e)
+ {
+ PagingTest.log.info("Expected buffer:" + UnitTestCase.dumbBytesHex(body, 40));
+ PagingTest.log.info("Arriving buffer:" + UnitTestCase.dumbBytesHex(message2.getBodyBuffer()
+ .toByteBuffer()
+ .array(), 40));
+ throw e;
+ }
+ }
+
+ consumer.close();
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ errors.incrementAndGet();
+ }
+
+ }
+ };
+ }
+
+ for (int i = 0; i < 2; i++)
+ {
+ threads[i].start();
+ }
+
+ for (int i = 0; i < 2; i++)
+ {
+ threads[i].join();
+ }
+ }
+ finally
+ {
+ try
+ {
+ server.stop();
+ }
+ catch (Throwable ignored)
+ {
+ }
+ }
+
+ }
+
private void internaltestSendReceivePaging(final boolean persistentMessages) throws Exception
{
clearData();
@@ -150,11 +332,10 @@
session.close();
-
if (persistentMessages)
{
server.stop();
-
+
server = createServer(true,
config,
PagingTest.PAGE_SIZE,
@@ -360,8 +541,7 @@
}
}
-
-
+
/**
* - Make a destination in page mode
* - Add stuff to a transaction
@@ -396,7 +576,7 @@
sf.setBlockOnNonDurableSend(true);
sf.setBlockOnDurableSend(true);
sf.setBlockOnAcknowledge(true);
-
+
byte[] body = new byte[messageSize];
ClientSession sessionTransacted = sf.createSession(null, null, false, false, false, false, 0);
@@ -408,7 +588,7 @@
ClientMessage firstMessage = sessionTransacted.createMessage(IS_DURABLE_MESSAGE);
firstMessage.getBodyBuffer().writeBytes(body);
firstMessage.putIntProperty(new SimpleString("id"), 0);
-
+
producerTransacted.send(firstMessage);
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
@@ -479,7 +659,7 @@
Integer messageID = (Integer)message.getObjectProperty(new SimpleString("id"));
-// System.out.println(messageID);
+ // System.out.println(messageID);
Assert.assertNotNull(messageID);
Assert.assertEquals("message received out of order", i, messageID.intValue());
@@ -505,8 +685,6 @@
}
-
-
public void testPageOnSchedulingNoRestart() throws Exception
{
internalTestPageOnScheduling(false);
@@ -756,8 +934,6 @@
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
ClientMessage message = null;
-
-
for (int i = 0; i < numberOfMessages; i++)
{
@@ -955,7 +1131,7 @@
}
}
-
+
public void testDropMessagesExpiring() throws Exception
{
clearData();
@@ -969,7 +1145,7 @@
settings.put(PagingTest.ADDRESS.toString(), set);
- HornetQServer server = createServer(true, config, 1024, 1024 * 1024, settings);
+ HornetQServer server = createServer(true, config, 1024, 1024 * 1024, settings);
server.start();
@@ -978,7 +1154,7 @@
try
{
ClientSessionFactory sf = createInVMFactory();
-
+
sf.setAckBatchSize(0);
ClientSession session = sf.createSession();
@@ -988,7 +1164,7 @@
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
ClientMessage message = null;
-
+
class MyHandler implements MessageHandler
{
int count;
@@ -1001,16 +1177,16 @@
}
catch (Exception e)
{
-
+
}
-
+
count++;
-
+
if (count % 1000 == 0)
{
log.info("received " + count);
}
-
+
try
{
message.acknowledge();
@@ -1019,13 +1195,13 @@
{
e.printStackTrace();
}
- }
+ }
}
-
+
ClientConsumer consumer = session.createConsumer(PagingTest.ADDRESS);
session.start();
-
+
consumer.setMessageHandler(new MyHandler());
for (int i = 0; i < numberOfMessages; i++)
@@ -1034,12 +1210,12 @@
message = session.createMessage(false);
message.getBodyBuffer().writeBytes(body);
-
+
message.setExpiration(System.currentTimeMillis() + 100);
producer.send(message);
}
-
+
session.close();
}
finally
Modified: branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java 2010-08-27 16:44:17 UTC (rev 9606)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/journal/impl/AlignedJournalImplTest.java 2010-08-29 20:01:42 UTC (rev 9607)
@@ -463,8 +463,12 @@
{
AlignedJournalImplTest.log.debug("Expected exception " + e, e);
}
+
setupAndLoadJournal(JOURNAL_SIZE, 100);
+
+ journalImpl.forceMoveNextFile();
+ journalImpl.checkReclaimStatus();
Assert.assertEquals(0, records.size());
Assert.assertEquals(0, transactions.size());
Modified: branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-08-27 16:44:17 UTC (rev 9606)
+++ branches/Branch_2_1/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2010-08-29 20:01:42 UTC (rev 9607)
@@ -125,22 +125,6 @@
Assert.assertEquals(nr1, trans2.getNumberOfMessages());
- for (int i = 0; i < nr1; i++)
- {
- trans.decrement();
- }
-
- Assert.assertEquals(0, trans.getNumberOfMessages());
-
- try
- {
- trans.decrement();
- Assert.fail("Exception expected!");
- }
- catch (Throwable ignored)
- {
- }
-
}
public void testDoubleStart() throws Exception
@@ -1355,6 +1339,20 @@
{
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#deletePageTransactional(long)
+ */
+ public void deletePageTransactional(long recordID) throws Exception
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#updatePageTransaction(long, org.hornetq.core.paging.PageTransactionInfo, int)
+ */
+ public void updatePageTransaction(long txID, PageTransactionInfo pageTransaction, int depage) throws Exception
+ {
+ }
+
}
class FakeStoreFactory implements PagingStoreFactory
More information about the hornetq-commits
mailing list