[jboss-cvs] JBoss Messaging SVN: r4930 - in trunk: src/main/org/jboss/messaging/core/journal/impl and 9 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Sep 10 22:14:13 EDT 2008
Author: clebert.suconic at jboss.com
Date: 2008-09-10 22:14:12 -0400 (Wed, 10 Sep 2008)
New Revision: 4930
Modified:
trunk/src/main/org/jboss/messaging/core/journal/PreparedTransactionInfo.java
trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
trunk/src/main/org/jboss/messaging/core/journal/impl/TransactionHolder.java
trunk/src/main/org/jboss/messaging/core/paging/PageTransactionInfo.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PageMessageImpl.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PageTransactionInfoImpl.java
trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java
trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaRecoveryTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/MessageReferenceImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/transaction/impl/TransactionImplTest.java
Log:
JBMESSAGING-1299 - reloading & recovery Prepared Transactions.. Applying fixes on paging as well
Modified: trunk/src/main/org/jboss/messaging/core/journal/PreparedTransactionInfo.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/PreparedTransactionInfo.java 2008-09-10 21:03:06 UTC (rev 4929)
+++ trunk/src/main/org/jboss/messaging/core/journal/PreparedTransactionInfo.java 2008-09-11 02:14:12 UTC (rev 4930)
@@ -24,9 +24,7 @@
package org.jboss.messaging.core.journal;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
/**
*
@@ -43,7 +41,7 @@
public final List<RecordInfo> records = new ArrayList<RecordInfo>();
- public final Set<RecordInfo> recordsToDelete = new HashSet<RecordInfo>();
+ public final List<RecordInfo> recordsToDelete = new ArrayList<RecordInfo>();
public PreparedTransactionInfo(final long id, final byte[] extraData)
{
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-09-10 21:03:06 UTC (rev 4929)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/JournalImpl.java 2008-09-11 02:14:12 UTC (rev 4930)
@@ -382,6 +382,11 @@
public long getTransactionID()
{
+ if (state != STATE_LOADED)
+ {
+ throw new IllegalStateException("Journal must be loaded first");
+ }
+
return transactionIDSequence.getAndIncrement();
}
Modified: trunk/src/main/org/jboss/messaging/core/journal/impl/TransactionHolder.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/journal/impl/TransactionHolder.java 2008-09-10 21:03:06 UTC (rev 4929)
+++ trunk/src/main/org/jboss/messaging/core/journal/impl/TransactionHolder.java 2008-09-11 02:14:12 UTC (rev 4930)
@@ -23,9 +23,7 @@
package org.jboss.messaging.core.journal.impl;
import java.util.ArrayList;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
import org.jboss.messaging.core.journal.RecordInfo;
@@ -49,7 +47,7 @@
public final List<RecordInfo> recordInfos = new ArrayList<RecordInfo>();
- public final Set<RecordInfo> recordsToDelete = new HashSet<RecordInfo>();
+ public final ArrayList<RecordInfo> recordsToDelete = new ArrayList<RecordInfo>();
public boolean prepared;
Modified: trunk/src/main/org/jboss/messaging/core/paging/PageTransactionInfo.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/PageTransactionInfo.java 2008-09-10 21:03:06 UTC (rev 4929)
+++ trunk/src/main/org/jboss/messaging/core/paging/PageTransactionInfo.java 2008-09-11 02:14:12 UTC (rev 4930)
@@ -35,9 +35,11 @@
public interface PageTransactionInfo extends EncodingSupport
{
- void waitCompletion() throws InterruptedException;
+ boolean waitCompletion() throws Exception;
void complete();
+
+ void forget();
long getRecordID();
@@ -50,5 +52,7 @@
int decrement();
int getNumberOfMessages();
+
+ void markIncomplete();
}
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PageMessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PageMessageImpl.java 2008-09-10 21:03:06 UTC (rev 4929)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PageMessageImpl.java 2008-09-11 02:14:12 UTC (rev 4930)
@@ -49,7 +49,7 @@
// Public --------------------------------------------------------
private final ServerMessage message;
- private long transactionID;
+ private long transactionID = -1;
public PageMessageImpl(final ServerMessage message, final long transactionID)
{
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PageTransactionInfoImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PageTransactionInfoImpl.java 2008-09-10 21:03:06 UTC (rev 4929)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PageTransactionInfoImpl.java 2008-09-11 02:14:12 UTC (rev 4930)
@@ -45,6 +45,7 @@
private long transactionID;
private long recordID;
private CountDownLatch countDownCompleted;
+ private volatile boolean complete;
final AtomicInteger numberOfMessages = new AtomicInteger(0);
@@ -108,7 +109,8 @@
{
this.transactionID = buffer.getLong();
this.numberOfMessages.set(buffer.getInt());
- this.countDownCompleted = null; // if it is being readed, certainly it was committed
+ this.countDownCompleted = null; // if it is being readed, probably it was committed
+ this.complete = true; // Unless it is a incomplete prepare, which is marked by markIcomplete
}
public synchronized void encode(final MessagingBuffer buffer)
@@ -124,6 +126,7 @@
public void complete()
{
+ complete = true;
/**
* this is to avoid a race condition where the transaction still being committed another thread is depaging messages
*/
@@ -133,14 +136,29 @@
/**
* this is to avoid a race condition where the transaction still being committed another thread is depaging messages
*/
- public void waitCompletion() throws InterruptedException
+ public boolean waitCompletion() throws InterruptedException
{
if (countDownCompleted != null)
{
countDownCompleted.await();
}
+
+ return complete;
}
+ public void forget()
+ {
+ complete = false;
+
+ countDownCompleted.countDown();
+ }
+
+ public void markIncomplete()
+ {
+ complete = false;
+ countDownCompleted = new CountDownLatch(1);
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2008-09-10 21:03:06 UTC (rev 4929)
+++ trunk/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java 2008-09-11 02:14:12 UTC (rev 4930)
@@ -39,7 +39,6 @@
import org.jboss.messaging.core.paging.PagingStoreFactory;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.postoffice.impl.PostOfficeImpl;
import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.settings.HierarchicalRepository;
@@ -79,7 +78,7 @@
// Static --------------------------------------------------------------------------------------------------------------------------
- private static final Logger log = Logger.getLogger(PostOfficeImpl.class);
+ private static final Logger log = Logger.getLogger(PagingManagerImpl.class);
//private static final boolean isTrace = log.isTraceEnabled();
private static final boolean isTrace = true;
@@ -153,7 +152,7 @@
*/
public boolean onDepage(int pageId, final SimpleString destination, PagingStore pagingStore, final PageMessage[] data) throws Exception
{
- log.info("Depaging....");
+ trace("Depaging....");
/// Depage has to be done atomically, in case of failure it should be back to where it was
final long depageTransactionID = storageManager.generateTransactionID();
@@ -184,7 +183,7 @@
for (PageMessage msg: data)
{
final long transactionIdDuringPaging = msg.getTransactionID();
- if (transactionIdDuringPaging > 0)
+ if (transactionIdDuringPaging >= 0)
{
final PageTransactionInfo pageTransactionInfo = transactions.get(transactionIdDuringPaging);
@@ -194,13 +193,17 @@
{
if (isTrace)
{
- trace("Transaction " + msg.getTransactionID() + " not found, ignoring message " + msg.getMessage().getMessageID());
+ trace("Transaction " + msg.getTransactionID() + " not found, ignoring message " + msg.getMessage());
}
continue;
}
// This is to avoid a race condition where messages are depaged before the commit arrived
- pageTransactionInfo.waitCompletion();
+ if (!pageTransactionInfo.waitCompletion())
+ {
+ trace("Rollback was called after prepare, ignoring message " + msg.getMessage());
+ continue;
+ }
/// Update information about transactions
if (msg.getMessage().isDurable())
@@ -248,7 +251,7 @@
public void setLastPage(LastPageRecord lastPage) throws Exception
{
- System.out.println("LastPage loaded was " + lastPage.getLastId() + " recordID = " + lastPage.getRecordId());
+ trace("LastPage loaded was " + lastPage.getLastId() + " recordID = " + lastPage.getRecordId());
this.getPageStore(lastPage.getDestination()).setLastRecord(lastPage);
}
Modified: trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java 2008-09-10 21:03:06 UTC (rev 4929)
+++ trunk/src/main/org/jboss/messaging/core/persistence/StorageManager.java 2008-09-11 02:14:12 UTC (rev 4930)
@@ -62,7 +62,7 @@
void storeAcknowledgeTransactional(long txID, long queueID, long messageiD) throws Exception;
- void storeDeleteMessageTransactional(long txID, long messageID, long queueID) throws Exception;
+ void storeDeleteMessageTransactional(long txID, long queueID, long messageID) throws Exception;
/** Used to delete non-messaging data (such as PageTransaction and LasPage) */
void storeDeleteTransactional(long txID, long recordID) throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-09-10 21:03:06 UTC (rev 4929)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalStorageManager.java 2008-09-11 02:14:12 UTC (rev 4930)
@@ -250,7 +250,7 @@
messageJournal.appendDeleteRecordTransactional(txID, recordID, null);
}
- public void storeDeleteMessageTransactional(long txID, long messageID, long queueID) throws Exception
+ public void storeDeleteMessageTransactional(long txID, long queueID, long messageID) throws Exception
{
messageJournal.appendDeleteRecordTransactional(txID, messageID, new DeleteEncoding(queueID));
}
@@ -289,63 +289,6 @@
idSequence.set(maxID + 1);
- //recover prepared transactions
- for (PreparedTransactionInfo preparedTransaction : preparedTransactions)
- {
- log.trace(preparedTransaction);
- EncodingXid encodingXid = new EncodingXid(preparedTransaction.extraData);
- Xid xid = encodingXid.xid;
-
- Transaction tx = new TransactionImpl(preparedTransaction.id, xid, this, postOffice);
- List<ServerMessage> messages = new ArrayList<ServerMessage>();
- List<ServerMessage> messagesToDelete = new ArrayList<ServerMessage>();
- //first get any sent messages for this tx and recreate
- for (RecordInfo record : preparedTransaction.records)
- {
- byte[] data = record.data;
-
- ByteBuffer bb = ByteBuffer.wrap(data);
-
- MessagingBuffer buff = new ByteBufferWrapper(bb);
-
- ServerMessage message = new ServerMessageImpl(record.id);
-
- message.decode(buff);
-
- messages.add(message);
- }
- //ok now find if any records to be deleted which aren't necessarily with this tx
- List<RecordInfo> recordsToDelete = new ArrayList<RecordInfo>();
- for (RecordInfo record : records)
- {
- if(preparedTransaction.recordsToDelete.contains(record.id))
- {
- byte[] data = record.data;
-
- ByteBuffer bb = ByteBuffer.wrap(data);
-
- byte recordType = record.getUserRecordType();
-
- MessagingBuffer buff = new ByteBufferWrapper(bb);
-
- ServerMessage message = new ServerMessageImpl(record.id);
-
- message.decode(buff);
-
- messagesToDelete.add(message);
-
- recordsToDelete.add(record);
- }
- }
- //now we recreate the state of the tx and add to th erresource manager
- tx.replay(messages, messagesToDelete, Transaction.State.PREPARED);
- resourceManager.putTransaction(xid, tx);
- //and finally since we've dealt with the records we don't need to process them.
- for (RecordInfo recordInfo : recordsToDelete)
- {
- records.remove(recordInfo);
- }
- }
for (RecordInfo record: records)
{
byte[] data = record.data;
@@ -463,8 +406,11 @@
}
}
}
- }
-
+
+ loadPreparedTransactions(postOffice, queues, resourceManager,preparedTransactions);
+
+ }
+
//Bindings operations
public void addBinding(Binding binding) throws Exception
@@ -646,6 +592,118 @@
// Private ----------------------------------------------------------------------------------
+ private void loadPreparedTransactions(final PostOffice postOffice,
+ final Map<Long, Queue> queues, ResourceManager resourceManager,
+ List<PreparedTransactionInfo> preparedTransactions) throws Exception
+ {
+ //recover prepared transactions
+ for (PreparedTransactionInfo preparedTransaction : preparedTransactions)
+ {
+ log.trace(preparedTransaction);
+ EncodingXid encodingXid = new EncodingXid(preparedTransaction.extraData);
+ Xid xid = encodingXid.xid;
+
+ Transaction tx = new TransactionImpl(preparedTransaction.id, xid, this, postOffice);
+ List<MessageReference> messages = new ArrayList<MessageReference>();
+ List<MessageReference> messagesToAck = new ArrayList<MessageReference>();
+
+ PageTransactionInfoImpl pageTransactionInfo = null;
+
+ //first get any sent messages for this tx and recreate
+ for (RecordInfo record : preparedTransaction.records)
+ {
+ byte[] data = record.data;
+
+ ByteBuffer bb = ByteBuffer.wrap(data);
+
+ MessagingBuffer buff = new ByteBufferWrapper(bb);
+
+ byte recordType = record.getUserRecordType();
+
+ switch(recordType)
+ {
+ case ADD_MESSAGE:
+ {
+ ServerMessage message = new ServerMessageImpl(record.id);
+
+ message.decode(buff);
+
+ List<MessageReference> refs = postOffice.route(message);
+ messages.addAll(refs);
+ break;
+ }
+ case ACKNOWLEDGE_REF:
+ {
+ long messageID = record.id;
+
+ ACKEncoding encoding = new ACKEncoding();
+ encoding.decode(buff);
+
+
+ Queue queue = queues.get(encoding.queueID);
+
+ if (queue == null)
+ {
+ throw new IllegalStateException("Cannot find queue with id " + encoding.queueID);
+ }
+
+ MessageReference removed = queue.removeReferenceWithID(messageID);
+
+ messagesToAck.add(removed);
+ if (removed == null)
+ {
+ throw new IllegalStateException("Failed to remove reference for " + messageID);
+ }
+ break;
+ }
+ case PAGE_TRANSACTION:
+ {
+ pageTransactionInfo = new PageTransactionInfoImpl();
+ pageTransactionInfo.decode(buff);
+ pageTransactionInfo.markIncomplete();
+ break;
+ }
+ default:
+ log.warn("InternalError: Record type " + recordType + " not recognized. Maybe you're using journal files created on a different version" );
+ }
+ }
+
+ for (RecordInfo record : preparedTransaction.recordsToDelete)
+ {
+ byte[] data = record.data;
+
+ ByteBuffer bb = ByteBuffer.wrap(data);
+
+ MessagingBuffer buff = new ByteBufferWrapper(bb);
+
+ long messageID = record.id;
+
+ DeleteEncoding encoding = new DeleteEncoding();
+ encoding.decode(buff);
+
+
+ Queue queue = queues.get(encoding.queueID);
+
+ if (queue == null)
+ {
+ throw new IllegalStateException("Cannot find queue with id " + encoding.queueID);
+ }
+
+ MessageReference removed = queue.removeReferenceWithID(messageID);
+
+ messagesToAck.add(removed);
+ if (removed == null)
+ {
+ throw new IllegalStateException("Failed to remove reference for " + messageID);
+ }
+ }
+
+ //now we recreate the state of the tx and add to the resource manager
+ tx.replay(messages, messagesToAck, pageTransactionInfo, Transaction.State.PREPARED);
+ resourceManager.putTransaction(xid, tx);
+ }
+ }
+
private void checkAndCreateDir(String dir, boolean create)
{
File f = new File(dir);
Modified: trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java 2008-09-10 21:03:06 UTC (rev 4929)
+++ trunk/src/main/org/jboss/messaging/core/transaction/Transaction.java 2008-09-11 02:14:12 UTC (rev 4930)
@@ -23,6 +23,7 @@
package org.jboss.messaging.core.transaction;
import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.paging.PageTransactionInfo;
import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.settings.HierarchicalRepository;
@@ -66,7 +67,7 @@
void markAsRollbackOnly(MessagingException messagingException);
- void replay(List<ServerMessage> messages, List<ServerMessage> acknowledgements, State prepared) throws Exception;
+ void replay(List<MessageReference> messages, List<MessageReference> acknowledgements, PageTransactionInfo pageTransaction, State prepared) throws Exception;
static enum State
{
Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2008-09-10 21:03:06 UTC (rev 4929)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2008-09-11 02:14:12 UTC (rev 4930)
@@ -24,6 +24,7 @@
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.paging.PageTransactionInfo;
import org.jboss.messaging.core.paging.PagingManager;
import org.jboss.messaging.core.paging.impl.PageTransactionInfoImpl;
import org.jboss.messaging.core.persistence.StorageManager;
@@ -60,7 +61,7 @@
private final List<ServerMessage> pagedMessages = new ArrayList<ServerMessage>();
- private PageTransactionInfoImpl pageTransaction;
+ private PageTransactionInfo pageTransaction;
private final Xid xid;
@@ -200,13 +201,11 @@
if (count == 0)
{
- storageManager.storeDeleteTransactional(id, message
- .getMessageID());
+ storageManager.storeDeleteMessageTransactional(id, queue.getPersistenceID(), message.getMessageID());
}
else
{
- storageManager.storeAcknowledgeTransactional(id, queue
- .getPersistenceID(), message.getMessageID());
+ storageManager.storeAcknowledgeTransactional(id, queue.getPersistenceID(), message.getMessageID());
}
containsPersistent = true;
@@ -317,6 +316,11 @@
{
storageManager.rollback(id);
}
+
+ if (state == State.PREPARED && pageTransaction != null)
+ {
+ pageTransaction.forget();
+ }
Map<Queue, LinkedList<MessageReference>> queueMap = new HashMap<Queue, LinkedList<MessageReference>>();
@@ -415,22 +419,19 @@
this.messagingException = messagingException;
}
- public void replay(List<ServerMessage> messages, List<ServerMessage> acknowledgements, State prepared) throws Exception
+ public void replay(List<MessageReference> messages, List<MessageReference> acknowledgements, PageTransactionInfo pageTransaction, State prepared) throws Exception
{
containsPersistent = true;
- //acknowledgements.add
- for (ServerMessage message : messages)
+ refsToAdd.addAll(messages);
+ this.acknowledgements.addAll(acknowledgements);
+ this.pageTransaction = pageTransaction;
+
+ if (this.pageTransaction != null)
{
- List<MessageReference> refs = postOffice.route(message);
- refsToAdd.addAll(refs);
+ pagingManager.addTransaction(this.pageTransaction);
}
- for (ServerMessage message : acknowledgements)
- {
- List<MessageReference> refs = postOffice.route(message);
- this.acknowledgements.addAll(refs);
- }
+
state = prepared;
-
}
public void setContainsPersistent(final boolean containsPersistent)
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaRecoveryTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaRecoveryTest.java 2008-09-10 21:03:06 UTC (rev 4929)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/xa/BasicXaRecoveryTest.java 2008-09-11 02:14:12 UTC (rev 4930)
@@ -28,7 +28,9 @@
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.server.MessagingService;
import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.core.settings.impl.QueueSettings;
import org.jboss.messaging.core.transaction.impl.XidImpl;
+import org.jboss.messaging.jms.client.JBossBytesMessage;
import org.jboss.messaging.jms.client.JBossTextMessage;
import org.jboss.messaging.tests.util.UnitTestCase;
import org.jboss.messaging.util.SimpleString;
@@ -38,6 +40,8 @@
import javax.transaction.xa.Xid;
import java.io.File;
import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
/**
* @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
@@ -46,9 +50,12 @@
{
private static final String ACCEPTOR_FACTORY = "org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory";
private static final String CONNECTOR_FACTORY = "org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory";
+
+ private Map<String, QueueSettings> queueSettings = new HashMap<String, QueueSettings>();
private String journalDir = System.getProperty("java.io.tmpdir", "/tmp") + "/xa-recovery-test/journal";
private String bindingsDir = System.getProperty("java.io.tmpdir", "/tmp") + "/xa-recovery-test/bindings";
+ private String pageDir = System.getProperty("java.io.tmpdir", "/tmp") + "/xa-recovery-test/page";
private MessagingService messagingService;
private ClientSession clientSession;
private ClientProducer clientProducer;
@@ -59,15 +66,20 @@
protected void setUp() throws Exception
{
+ queueSettings.clear();
File file = new File(journalDir);
File file2 = new File(bindingsDir);
+ File file3 = new File(pageDir);
deleteDirectory(file);
file.mkdirs();
deleteDirectory(file2);
file2.mkdirs();
+ deleteDirectory(file3);
+ file3.mkdirs();
configuration = new ConfigurationImpl();
configuration.setSecurityEnabled(false);
configuration.setJournalMinFiles(2);
+ configuration.setPagingDirectory(pageDir);
TransportConfiguration transportConfig = new TransportConfiguration(ACCEPTOR_FACTORY);
configuration.getAcceptorConfigurations().add(transportConfig);
@@ -220,6 +232,142 @@
testMultipleTxReceiveWithRollback(true);
}
+
+ public void testPagingServerRestarted() throws Exception
+ {
+ testPaging(true);
+ }
+
+ public void testPaging() throws Exception
+ {
+ testPaging(false);
+ }
+
+ public void testPaging(boolean restartServer) throws Exception
+ {
+ Xid xid = new XidImpl("xa1".getBytes(), 1, new GUID().toString().getBytes());
+
+ SimpleString pageQueue = new SimpleString("pagequeue");
+
+ QueueSettings pageQueueSettings = new QueueSettings();
+ pageQueueSettings.setMaxSizeBytes(100*1024);
+ pageQueueSettings.setPageSizeBytes(10*1024);
+
+ queueSettings.put(pageQueue.toString(), pageQueueSettings);
+
+ addSettings();
+
+ clientSession.createQueue(pageQueue, pageQueue, null, true, true);
+
+ clientSession.start(xid, XAResource.TMNOFLAGS);
+
+ ClientProducer pageProducer = clientSession.createProducer(pageQueue);
+
+ for (int i = 0; i < 1000; i++)
+ {
+ ClientMessage m = createBytesMessage(new byte[512], true);
+ pageProducer.send(m);
+ }
+
+ pageProducer.close();
+
+ clientSession.end(xid, XAResource.TMSUCCESS);
+ clientSession.prepare(xid);
+
+ if (restartServer)
+ {
+ stopAndRestartServer();
+ }
+ else
+ {
+ recreateClients();
+ }
+
+ Xid[] xids = clientSession.recover(XAResource.TMSTARTRSCAN);
+ assertEquals(xids.length, 1);
+ assertEquals(xids[0].getFormatId(), xid.getFormatId());
+ assertEqualsByteArrays(xids[0].getBranchQualifier(), xid.getBranchQualifier());
+ assertEqualsByteArrays(xids[0].getGlobalTransactionId(), xid.getGlobalTransactionId());
+
+ clientSession.commit(xid, true);
+
+ clientSession.start();
+
+ ClientConsumer pageConsumer = clientSession.createConsumer(pageQueue);
+
+ for (int i = 0; i < 1000; i++)
+ {
+ ClientMessage m = pageConsumer.receive(10000);
+ assertNotNull(m);
+ clientSession.acknowledge();
+ }
+
+ }
+
+ public void testRollbackPaging() throws Exception
+ {
+ testRollbackPaging(false);
+ }
+
+ public void testRollbackPagingServerRestarted() throws Exception
+ {
+ testRollbackPaging(true);
+ }
+
+ public void testRollbackPaging(boolean restartServer) throws Exception
+ {
+ Xid xid = new XidImpl("xa1".getBytes(), 1, new GUID().toString().getBytes());
+
+ SimpleString pageQueue = new SimpleString("pagequeue");
+
+ QueueSettings pageQueueSettings = new QueueSettings();
+ pageQueueSettings.setMaxSizeBytes(100*1024);
+ pageQueueSettings.setPageSizeBytes(10*1024);
+
+ queueSettings.put(pageQueue.toString(), pageQueueSettings);
+
+ addSettings();
+
+ clientSession.createQueue(pageQueue, pageQueue, null, true, true);
+
+ clientSession.start(xid, XAResource.TMNOFLAGS);
+
+ ClientProducer pageProducer = clientSession.createProducer(pageQueue);
+
+ for (int i = 0; i < 1000; i++)
+ {
+ ClientMessage m = createBytesMessage(new byte[512], true);
+ pageProducer.send(m);
+ }
+
+ clientSession.end(xid, XAResource.TMSUCCESS);
+ clientSession.prepare(xid);
+
+ if (restartServer)
+ {
+ stopAndRestartServer();
+ }
+ else
+ {
+ recreateClients();
+ }
+
+ Xid[] xids = clientSession.recover(XAResource.TMSTARTRSCAN);
+ assertEquals(1, xids.length);
+ assertEquals(xids[0].getFormatId(), xid.getFormatId());
+ assertEqualsByteArrays(xids[0].getBranchQualifier(), xid.getBranchQualifier());
+ assertEqualsByteArrays(xids[0].getGlobalTransactionId(), xid.getGlobalTransactionId());
+
+ clientSession.rollback(xid);
+
+ clientSession.start();
+
+ ClientConsumer pageConsumer = clientSession.createConsumer(pageQueue);
+
+ assertNull(pageConsumer.receive(100));
+
+ }
+
public void testNonPersistent() throws Exception
{
testNonPersistent(true);
@@ -227,7 +375,7 @@
}
- public void testNonPersistent(boolean commit) throws Exception
+ public void testNonPersistent(final boolean commit) throws Exception
{
Xid xid = new XidImpl("xa1".getBytes(), 1, new GUID().toString().getBytes());
@@ -254,9 +402,52 @@
assertEqualsByteArrays(xids[0].getGlobalTransactionId(), xid.getGlobalTransactionId());
xids = clientSession.recover(XAResource.TMENDRSCAN);
assertEquals(xids.length, 0);
- clientSession.commit(xid, true);
+ if (commit)
+ {
+ clientSession.commit(xid, true);
+ }
+ else
+ {
+ clientSession.rollback(xid);
+ }
}
+ public void testNonPersistentMultipleIDs() throws Exception
+ {
+ for (int i = 0; i < 10; i++)
+ {
+ Xid xid = new XidImpl("xa1".getBytes(), 1, new GUID().toString().getBytes());
+
+ ClientMessage m1 = createTextMessage("m1", false);
+ ClientMessage m2 = createTextMessage("m2", false);
+ ClientMessage m3 = createTextMessage("m3", false);
+ ClientMessage m4 = createTextMessage("m4", false);
+
+ clientSession.start(xid, XAResource.TMNOFLAGS);
+ clientProducer.send(m1);
+ clientProducer.send(m2);
+ clientProducer.send(m3);
+ clientProducer.send(m4);
+ clientSession.end(xid, XAResource.TMSUCCESS);
+ clientSession.prepare(xid);
+
+ if (i == 2)
+ {
+ clientSession.commit(xid, true);
+ }
+
+ recreateClients();
+
+
+ }
+
+ stopAndRestartServer();
+
+ Xid[] xids = clientSession.recover(XAResource.TMSTARTRSCAN);
+
+ assertEquals(9, xids.length);
+ }
+
public void testBasicSendWithCommit(boolean stopServer) throws Exception
{
Xid xid = new XidImpl("xa1".getBytes(), 1, new GUID().toString().getBytes());
@@ -284,13 +475,14 @@
}
Xid[] xids = clientSession.recover(XAResource.TMSTARTRSCAN);
-
assertEquals(xids.length, 1);
assertEquals(xids[0].getFormatId(), xid.getFormatId());
assertEqualsByteArrays(xids[0].getBranchQualifier(), xid.getBranchQualifier());
assertEqualsByteArrays(xids[0].getGlobalTransactionId(), xid.getGlobalTransactionId());
+
xids = clientSession.recover(XAResource.TMENDRSCAN);
assertEquals(xids.length, 0);
+
clientSession.commit(xid, true);
clientSession.start();
ClientMessage m = clientConsumer.receive(1000);
@@ -997,10 +1189,21 @@
messagingService.stop();
messagingService = null;
messagingService = MessagingServiceImpl.newNioStorageMessagingServer(configuration, journalDir, bindingsDir);
+
+ addSettings();
+
messagingService.start();
createClients();
}
+ private void addSettings()
+ {
+ for (Map.Entry<String, QueueSettings> setting: this.queueSettings.entrySet())
+ {
+ messagingService.getServer().getQueueSettingsRepository().addMatch(setting.getKey(), setting.getValue());
+ }
+ }
+
protected void recreateClients() throws Exception
{
clientSession.close();
@@ -1021,6 +1224,14 @@
return message;
}
+ private ClientMessage createBytesMessage(byte[] b, boolean durable)
+ {
+ ClientMessage message = clientSession.createClientMessage(JBossBytesMessage.TYPE, durable, 0, System.currentTimeMillis(), (byte) 1);
+ message.getBody().putBytes(b);
+ message.getBody().flip();
+ return message;
+ }
+
private void createClients()
throws MessagingException
{
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/MessageReferenceImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/MessageReferenceImplTest.java 2008-09-10 21:03:06 UTC (rev 4929)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/MessageReferenceImplTest.java 2008-09-11 02:14:12 UTC (rev 4930)
@@ -142,6 +142,8 @@
Queue queue = EasyMock.createStrictMock(Queue.class);
+ EasyMock.expect(queue.getPersistenceID()).andStubReturn(1);
+
MessageReferenceImpl messageReference = new DummyMessageReference(serverMessage, queue);
messageReference.setDeliveryCount(1);
@@ -219,6 +221,8 @@
Queue queue = EasyMock.createStrictMock(Queue.class);
+ EasyMock.expect(queue.getPersistenceID()).andStubReturn(1);
+
MessageReferenceImpl messageReference = new DummyMessageReference(serverMessage, queue);
messageReference.setDeliveryCount(1);
@@ -284,6 +288,9 @@
HierarchicalRepository<QueueSettings> repos = EasyMock.createStrictMock(HierarchicalRepository.class);
ServerMessage serverMessage = EasyMock.createStrictMock(ServerMessage.class);
Queue queue = EasyMock.createStrictMock(Queue.class);
+
+ EasyMock.expect(queue.getPersistenceID()).andStubReturn(1);
+
MessageReferenceImpl messageReference = new DummyMessageReference(serverMessage, queue);
messageReference.setDeliveryCount(1);
SimpleString queueName = new SimpleString("queueName");
@@ -323,6 +330,9 @@
ServerMessage serverMessage = EasyMock.createNiceMock(ServerMessage.class);
Queue queue = EasyMock.createStrictMock(Queue.class);
+
+ EasyMock.expect(queue.getPersistenceID()).andStubReturn(1);
+
MessageReferenceImpl messageReference = new DummyMessageReference(serverMessage, queue);
messageReference.setDeliveryCount(1);
SimpleString queueName = new SimpleString("queueName");
@@ -377,6 +387,9 @@
ServerMessage serverMessage = EasyMock.createNiceMock(ServerMessage.class);
Queue queue = EasyMock.createStrictMock(Queue.class);
+
+ EasyMock.expect(queue.getPersistenceID()).andStubReturn(1);
+
MessageReferenceImpl messageReference = new DummyMessageReference(serverMessage, queue);
messageReference.setDeliveryCount(1);
SimpleString queueName = new SimpleString("queueName");
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java 2008-09-10 21:03:06 UTC (rev 4929)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/QueueImplTest.java 2008-09-11 02:14:12 UTC (rev 4930)
@@ -770,7 +770,7 @@
{
if (i % 2 == 0)
{
- storageManager.storeDeleteTransactional(1, i);
+ storageManager.storeDeleteMessageTransactional(1, queue.getPersistenceID(), i);
}
}
@@ -778,7 +778,7 @@
{
if (i % 2 == 0)
{
- storageManager.storeDeleteTransactional(1, i);
+ storageManager.storeDeleteMessageTransactional(1, queue.getPersistenceID(), i);
}
}
@@ -1341,7 +1341,7 @@
StorageManager storageManager = EasyMock.createMock(StorageManager.class);
EasyMock.expect(storageManager.generateTransactionID()).andReturn(randomLong());
EasyMock.expect(storageManager.generateID()).andReturn(randomLong());
- storageManager.storeDeleteTransactional(EasyMock.anyLong(), EasyMock.eq(messageID));
+ storageManager.storeDeleteMessageTransactional(EasyMock.anyLong(), EasyMock.eq(queue.getPersistenceID()), EasyMock.eq(messageID));
storageManager.commit(EasyMock.anyLong());
PostOffice postOffice = createMock(PostOffice.class);
@@ -1399,7 +1399,7 @@
StorageManager storageManager = createMock(StorageManager.class);
expect(storageManager.generateTransactionID()).andReturn(randomLong());
expect(storageManager.generateID()).andReturn(randomLong());
- storageManager.storeDeleteTransactional(anyLong(), eq(messageID));
+ storageManager.storeDeleteMessageTransactional(anyLong(), eq(queue.getPersistenceID()), eq(messageID));
storageManager.commit(anyLong());
PostOffice postOffice = createMock(PostOffice.class);
@@ -1460,7 +1460,7 @@
StorageManager storageManager = EasyMock.createMock(StorageManager.class);
EasyMock.expect(storageManager.generateID()).andReturn(newMessageID);
EasyMock.expect(storageManager.generateTransactionID()).andReturn(tid);
- storageManager.storeDeleteTransactional(EasyMock.anyLong(), EasyMock.eq(messageID));
+ storageManager.storeDeleteMessageTransactional(EasyMock.anyLong(), EasyMock.eq(queue.getPersistenceID()), EasyMock.eq(messageID));
storageManager.commit(EasyMock.anyLong());
PostOffice postOffice = EasyMock.createMock(PostOffice.class);
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/transaction/impl/TransactionImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/transaction/impl/TransactionImplTest.java 2008-09-10 21:03:06 UTC (rev 4929)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/transaction/impl/TransactionImplTest.java 2008-09-11 02:14:12 UTC (rev 4930)
@@ -585,7 +585,7 @@
//Expect:
sm.storeAcknowledgeTransactional(txID, queue1.getPersistenceID(), message1.getMessageID());
- sm.storeDeleteTransactional(txID, message1.getMessageID());
+ sm.storeDeleteMessageTransactional(txID, queue2.getPersistenceID(), message1.getMessageID());
EasyMock.replay(sm);
More information about the jboss-cvs-commits
mailing list