[jboss-cvs] JBoss Messaging SVN: r3456 - in branches/Branch_New_Persistence: src/main/org/jboss/messaging/core/impl/postoffice and 7 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Dec 10 07:53:34 EST 2007
Author: timfox
Date: 2007-12-10 07:53:34 -0500 (Mon, 10 Dec 2007)
New Revision: 3456
Added:
branches/Branch_New_Persistence/src/main/org/jboss/test/unit/messaging/newcore/impl/TransactionTest.java
branches/Branch_New_Persistence/src/main/org/jboss/test/unit/messaging/newcore/impl/bdbje/BDBJEPersistenceManagerTest.java
Removed:
branches/Branch_New_Persistence/src/main/org/jboss/test/unit/messaging/newcore/impl/BDBJEPersistenceManagerTest.java
Modified:
branches/Branch_New_Persistence/.classpath
branches/Branch_New_Persistence/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/MessageImpl.java
branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/TransactionImpl.java
branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/BDBJEPersistenceManager.java
branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/intf/PersistenceManager.java
branches/Branch_New_Persistence/src/main/org/jboss/test/concurrent/messaging/newcore/impl/QueueTest.java
branches/Branch_New_Persistence/src/main/org/jboss/test/unit/messaging/UnitTestCase.java
branches/Branch_New_Persistence/src/main/org/jboss/test/unit/messaging/newcore/impl/MessageTest.java
Log:
More changes
Modified: branches/Branch_New_Persistence/.classpath
===================================================================
--- branches/Branch_New_Persistence/.classpath 2007-12-10 11:22:13 UTC (rev 3455)
+++ branches/Branch_New_Persistence/.classpath 2007-12-10 12:53:34 UTC (rev 3456)
@@ -61,5 +61,6 @@
<classpathentry kind="lib" path="thirdparty/jbossas/core-libs/lib/jboss-xml-binding.jar"/>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER"/>
<classpathentry kind="lib" path="/home/tim/work/je-3.2.44/lib/je-3.2.44.jar"/>
+ <classpathentry kind="lib" path="/home/tim/work/easymock2.3/easymock.jar"/>
<classpathentry kind="output" path="bin"/>
</classpath>
Modified: branches/Branch_New_Persistence/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
===================================================================
--- branches/Branch_New_Persistence/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java 2007-12-10 11:22:13 UTC (rev 3455)
+++ branches/Branch_New_Persistence/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java 2007-12-10 12:53:34 UTC (rev 3456)
@@ -38,6 +38,7 @@
import org.jgroups.Address;
import org.jgroups.Channel;
import org.jgroups.MembershipListener;
+import org.jgroups.MergeView;
import org.jgroups.Message;
import org.jgroups.MessageListener;
import org.jgroups.Receiver;
@@ -455,6 +456,11 @@
public void viewAccepted(final View newView)
{
log.debug(this + " got new view " + newView + ", old view is " + currentView);
+
+ if (newView instanceof MergeView)
+ {
+ //TODO handle merging after split-brain
+ }
// JGroups will make sure this method is never called by more than one thread concurrently
Modified: branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/MessageImpl.java
===================================================================
--- branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/MessageImpl.java 2007-12-10 11:22:13 UTC (rev 3455)
+++ branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/MessageImpl.java 2007-12-10 12:53:34 UTC (rev 3456)
@@ -100,6 +100,7 @@
long timestamp, byte priority)
{
this.messageID = messageID;
+ this.type = type;
this.reliable = reliable;
this.expiration = expiration;
this.timestamp = timestamp;
Modified: branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/TransactionImpl.java
===================================================================
--- branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/TransactionImpl.java 2007-12-10 11:22:13 UTC (rev 3455)
+++ branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/TransactionImpl.java 2007-12-10 12:53:34 UTC (rev 3456)
@@ -26,7 +26,7 @@
import javax.transaction.xa.Xid;
-import org.jboss.logging.Logger;
+import org.jboss.messaging.newcore.intf.Message;
import org.jboss.messaging.newcore.intf.MessageReference;
import org.jboss.messaging.newcore.intf.PersistenceManager;
import org.jboss.messaging.newcore.intf.Transaction;
@@ -41,12 +41,8 @@
*/
public class TransactionImpl implements Transaction
{
- private static final Logger log = Logger.getLogger(TransactionImpl.class);
+ private List<Message> messagesToAdd;
- private static final boolean trace = log.isTraceEnabled();
-
- private List<MessageReference> refsToAdd;
-
private List<MessageReference> refsToRemove;
private List<TransactionSynchronization> synchronizations = new ArrayList<TransactionSynchronization>();
@@ -55,16 +51,26 @@
private boolean containsPersistent;
- public TransactionImpl(List<MessageReference> refsToAdd, List<MessageReference> refsToRemove,
+ private boolean prepared;
+
+ public TransactionImpl(List<Message> messagesToAdd, List<MessageReference> refsToRemove,
boolean containsPersistent)
{
- this.refsToAdd = refsToAdd;
+ this.messagesToAdd = messagesToAdd;
this.refsToRemove = refsToRemove;
this.containsPersistent = containsPersistent;
}
+ public TransactionImpl(Xid xid, List<Message> messagesToAdd, List<MessageReference> refsToRemove,
+ boolean containsPersistent)
+ {
+ this(messagesToAdd, refsToRemove, containsPersistent);
+
+ this.xid = xid;
+ }
+
// Transaction implementation -----------------------------------------------------------
public void addSynchronization(TransactionSynchronization sync)
@@ -72,7 +78,7 @@
synchronizations.add(sync);
}
- public void prepare(PersistenceManager persistenceManager)
+ public void prepare(PersistenceManager persistenceManager) throws Exception
{
if (xid == null)
{
@@ -80,7 +86,9 @@
}
else
{
+ persistenceManager.prepareTransaction(xid, messagesToAdd, refsToRemove);
+ prepared = true;
}
}
@@ -93,52 +101,30 @@
if (xid == null)
{
//1PC commit
- PersistenceTransaction tx = null;
- try
- {
- tx = persistenceManager.createTransaction(false);
-
- playOperations(tx, persistenceManager);
-
- tx.commit();
-
- }
- catch (Exception e)
- {
- try
- {
- tx.rollback();
- }
- catch (Throwable t)
- {
- if (trace) { log.trace("Failed to rollback", t); }
- }
- throw e;
- }
+ persistenceManager.commitTransaction(messagesToAdd, refsToRemove);
}
else
{
//2PC commit
- PersistenceTransaction tx = null;
+ if (!prepared)
+ {
+ throw new IllegalStateException("Transaction is not prepared");
+ }
- tx = persistenceManager.getTransaction(xid);
-
- playOperations(tx, persistenceManager);
-
- tx.commit();
-
-
+ persistenceManager.commitPreparedTransaction(xid);
}
}
-
-
+
//Now add to queue(s)
- for (MessageReference reference: refsToAdd)
+ for (Message msg: messagesToAdd)
{
- reference.getQueue().addLast(reference);
+ for (MessageReference ref: msg.getReferences())
+ {
+ ref.getQueue().addLast(ref);
+ }
}
callSynchronizations(SyncType.AFTER_COMMIT);
@@ -154,33 +140,14 @@
}
else
{
-
+ persistenceManager.unprepareTransaction(xid, messagesToAdd, refsToRemove);
}
- callSynchronizations(SyncType.AFTER_ROLLBACK);
+ callSynchronizations(SyncType.AFTER_ROLLBACK);
}
// Private -------------------------------------------------------------------
- private void playOperations(PersistenceTransaction tx, PersistenceManager persistenceManager) throws Exception
- {
- for (MessageReference reference: refsToAdd)
- {
- if (reference.getMessage().isReliable())
- {
- persistenceManager.addReference(tx, reference.getQueue(), reference);
- }
- }
-
- for (MessageReference reference: refsToRemove)
- {
- if (reference.getMessage().isReliable())
- {
- persistenceManager.removeReference(tx, reference.getQueue(), reference);
- }
- }
- }
-
private void callSynchronizations(SyncType type) throws Exception
{
for (TransactionSynchronization sync: synchronizations)
@@ -197,7 +164,7 @@
{
sync.beforeRollback();
}
- else if (type == SyncType.AFTER_ROLLBACK);
+ else if (type == SyncType.AFTER_ROLLBACK)
{
sync.afterRollback();
}
Modified: branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/BDBJEPersistenceManager.java
===================================================================
--- branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/BDBJEPersistenceManager.java 2007-12-10 11:22:13 UTC (rev 3455)
+++ branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/impl/bdbje/BDBJEPersistenceManager.java 2007-12-10 12:53:34 UTC (rev 3456)
@@ -35,8 +35,6 @@
import org.jboss.messaging.newcore.intf.Queue;
import org.jboss.messaging.util.Pair;
-import com.sleepycat.je.DatabaseEntry;
-
/**
*
* A PersistenceManager implementation that stores messages using Berkeley DB Java Edition.
@@ -143,12 +141,14 @@
environment.stop();
+ recovery = false;
+
started = false;
}
// PersistenceManager implementation ----------------------------------------------------------
- public void commitMessage(Message message) throws Exception
+ public void addMessage(Message message) throws Exception
{
BDBJETransaction tx = null;
@@ -175,19 +175,18 @@
}
}
}
-
- public void commitMessages(List<Message> messages)
+
+ public void commitTransaction(List<Message> messagesToAdd,
+ List<MessageReference> referencesToRemove) throws Exception
{
+
BDBJETransaction tx = null;
try
{
tx = environment.createTransaction();
- for (Message message: messages)
- {
- internalCommitMessage(tx, message);
- }
+ playTx(tx, messagesToAdd, referencesToRemove);
tx.commit();
}
@@ -206,38 +205,46 @@
}
}
}
-
- public void deleteReferences(List<MessageReference> references)
- {
- BDBJETransaction tx = null;
-
+
+ public void prepareTransaction(Xid xid, List<Message> messagesToAdd,
+ List<MessageReference> referencesToRemove) throws Exception
+ {
+ environment.startWork(xid);
+
try
- {
- tx = environment.createTransaction();
-
- for (MessageReference ref: references)
- {
- internalDeleteReference(tx, ref);
- }
-
- tx.commit();
+ {
+ playTx(null, messagesToAdd, referencesToRemove);
}
catch (Exception e)
{
try
{
- if (tx != null)
- {
- tx.rollback();
- }
+ environment.endWork(xid, true);
}
catch (Throwable ignore)
{
- if (trace) { log.trace("Failed to rollback", ignore); }
+ if (trace) { log.trace("Failed to end", ignore); }
}
+
+ throw e;
}
+
+ environment.endWork(xid, false);
+
+ environment.prepare(xid);
}
+ public void commitPreparedTransaction(Xid xid) throws Exception
+ {
+ environment.commit(xid);
+ }
+
+ public void unprepareTransaction(Xid xid, List<Message> messagesToAdd,
+ List<MessageReference> referencesToRemove) throws Exception
+ {
+ environment.rollback(xid);
+ }
+
public void deleteReference(MessageReference reference)
{
BDBJETransaction tx = null;
@@ -268,6 +275,11 @@
public List<Xid> getInDoubtXids() throws Exception
{
+ if (!recovery)
+ {
+ throw new IllegalStateException("Must be in recovery mode to call getInDoubtXids()");
+ }
+
return environment.getInDoubtXids();
}
@@ -283,128 +295,107 @@
public void loadQueues(Map<Long, Queue> queues) throws Exception
{
- BDBJECursor cursorMessage = this.messageDB.cursor();
+ BDBJECursor cursorMessage = null;
- BDBJECursor cursorRef = this.refDB.cursor();
+ BDBJECursor cursorRef = null;
- Pair<Long, byte[]> messagePair;
-
- Pair<Long, byte[]> refPair;
-
- while ((messagePair = cursorMessage.getNext()) != null)
+ try
{
- refPair = cursorRef.getNext();
+ cursorMessage = messageDB.cursor();
- if (refPair == null)
- {
- throw new IllegalStateException("Message and ref data out of sync");
- }
-
- long id = messagePair.a;
+ cursorRef = refDB.cursor();
- byte[] bytes = messagePair.b;
-
- ByteBuffer buffer = ByteBuffer.wrap(bytes);
+ Pair<Long, byte[]> messagePair;
- int type = buffer.getInt();
+ Pair<Long, byte[]> refPair;
- long expiration = buffer.getLong();
-
- long timestamp = buffer.getLong();
-
- byte priority = buffer.get();
-
- int headerSize = buffer.getInt();
-
- //TODO we can optimise this to prevent a copy - let the message use a window on the byte[]
-
- byte[] headers = new byte[headerSize];
-
- buffer.get(headers);
-
- int payloadSize = buffer.getInt();
-
- byte[] payload = new byte[payloadSize];
-
- buffer.get(payload);
-
- Message message = new MessageImpl(id, type, true, expiration, timestamp, priority,
- headers, payload);
-
- //Now the ref data
-
- byte[] refBytes = refPair.b;
-
- buffer = ByteBuffer.wrap(refBytes);
-
- while (buffer.hasRemaining())
+ while ((messagePair = cursorMessage.getNext()) != null)
{
- long queueID = buffer.getLong();
+ refPair = cursorRef.getNext();
- int deliveryCount = buffer.getInt();
+ if (refPair == null)
+ {
+ throw new IllegalStateException("Message and ref data out of sync");
+ }
+
+ long id = messagePair.a;
- long scheduledDeliveryTime = buffer.getLong();
+ byte[] bytes = messagePair.b;
+
+ ByteBuffer buffer = ByteBuffer.wrap(bytes);
- Queue queue = queues.get(queueID);
+ int type = buffer.getInt();
- if (queue == null)
+ long expiration = buffer.getLong();
+
+ long timestamp = buffer.getLong();
+
+ byte priority = buffer.get();
+
+ int headerSize = buffer.getInt();
+
+ //TODO we can optimise this to prevent a copy - let the message use a window on the byte[]
+
+ byte[] headers = new byte[headerSize];
+
+ buffer.get(headers);
+
+ int payloadSize = buffer.getInt();
+
+ byte[] payload = new byte[payloadSize];
+
+ buffer.get(payload);
+
+ Message message = new MessageImpl(id, type, true, expiration, timestamp, priority,
+ headers, payload);
+
+ //Now the ref data
+
+ byte[] refBytes = refPair.b;
+
+ buffer = ByteBuffer.wrap(refBytes);
+
+ while (buffer.hasRemaining())
{
- //Ok - queue is not deployed
- }
- else
- {
- MessageReference reference = message.createReference(queue);
+ long queueID = buffer.getLong();
- reference.setDeliveryCount(deliveryCount);
+ int deliveryCount = buffer.getInt();
- reference.setScheduledDeliveryTime(scheduledDeliveryTime);
+ long scheduledDeliveryTime = buffer.getLong();
- queue.addLast(reference);
- }
- }
- }
- }
-
- public void prepareMessages(Xid xid, List<Message> messages) throws Exception
- {
- environment.startWork(xid);
-
- try
- {
- for (Message message: messages)
- {
- internalCommitMessage(null, message);
+ Queue queue = queues.get(queueID);
+
+ if (queue == null)
+ {
+ //Ok - queue is not deployed
+ }
+ else
+ {
+ MessageReference reference = message.createReference(queue);
+
+ reference.setDeliveryCount(deliveryCount);
+
+ reference.setScheduledDeliveryTime(scheduledDeliveryTime);
+
+ queue.addLast(reference);
+ }
+ }
}
}
- catch (Exception e)
+ finally
{
- try
+ if (cursorMessage != null)
{
- environment.endWork(xid, true);
+ cursorMessage.close();
}
- catch (Throwable ignore)
+
+ if (cursorRef != null)
{
- if (trace) { log.trace("Failed to end", ignore); }
+ cursorRef.close();
}
-
- throw e;
- }
+ }
+ }
- environment.endWork(xid, false);
-
- environment.prepare(xid);
- }
-
- public void commitPreparedMessages(Xid xid) throws Exception
- {
- environment.commit(xid);
- }
-
- public void unprepareMessages(Xid xid, List<Message> messages) throws Exception
- {
- environment.rollback(xid);
- }
-
public void updateDeliveryCount(Queue queue, MessageReference ref) throws Exception
{
//TODO - optimise this scan
@@ -455,8 +446,27 @@
}
// Private ---------------------------------------------------------------------------------
-
-
+
+ private void playTx(BDBJETransaction tx, List<Message> messagesToAdd,
+ List<MessageReference> referencesToRemove) throws Exception
+ {
+ if (messagesToAdd != null)
+ {
+ for (Message message: messagesToAdd)
+ {
+ internalCommitMessage(tx, message);
+ }
+ }
+
+ if (referencesToRemove != null)
+ {
+ for (MessageReference ref: referencesToRemove)
+ {
+ internalDeleteReference(tx, ref);
+ }
+ }
+ }
+
private void internalCommitMessage(BDBJETransaction tx, Message message) throws Exception
{
//First store the message
Modified: branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/intf/PersistenceManager.java
===================================================================
--- branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/intf/PersistenceManager.java 2007-12-10 11:22:13 UTC (rev 3455)
+++ branches/Branch_New_Persistence/src/main/org/jboss/messaging/newcore/intf/PersistenceManager.java 2007-12-10 12:53:34 UTC (rev 3456)
@@ -41,33 +41,33 @@
* to 1 or more queues.
* @param message
*/
- public void commitMessage(Message message) throws Exception;
+ public void addMessage(Message message) throws Exception;
/**
- * Multiple messages, each potentially with multiple message references needs to be added to storage
- * This would occur when a 1 PC transaction containing many messages arrives on the server and they need to be routed
- * to 1 or more queues.
- * @param messages
+ * Delete a single reference. This would also delete the message if it is no longer referenced by any other
+ * references.
+ * This would occur on acknowledgement of a single reference
+ * @param message
*/
- public void commitMessages(List<Message> messages) throws Exception;
-
+ void deleteReference(MessageReference reference) throws Exception;
+
/**
- * Multiple messages, each potentially with multiple message references needs to be added to storage
- * in prepared state - used for XA functionality
- * This would occur when a 2 PC transaction containing many messages arrives on the server and they need to routed
- * to 1 or more queues, but prepared first
- * @param xid - the Xid of the transaction
- * @param messages
+ * Commit a transaction containing messages to add and references to remove
+ * @param messagesToAdd List of messages to add, or null if none
+ * @param referencesToRemove List of references to remove, or null if none
+ * @throws Exception
*/
- public void prepareMessages(Xid xid, List<Message> messages) throws Exception;
+ public void commitTransaction(List<Message> messagesToAdd, List<MessageReference> referencesToRemove) throws Exception;
/**
- * Unprepare a List of messages - each with potentially many message references
- * This would occur when a 2 PC transaction rolls back
- * @param xid - the Xid of the transaction
- * @param messages
+ * Prepare a transaction containing messages to add and references to remove
+ * @param xid The Xid of the XA transaction
+ * @param messagesToAdd List of messages to add, or null if none
+ * @param referencesToRemove List of references to remove, or null if none
+ * @throws Exception
*/
- public void unprepareMessages(Xid xid, List<Message> messages) throws Exception;
+ public void prepareTransaction(Xid xid, List<Message> messagesToAdd,
+ List<MessageReference> referencesToRemove) throws Exception;
/**
* Commit a prepared transaction
@@ -75,24 +75,20 @@
* @param xid
* @throws Exception
*/
- public void commitPreparedMessages(Xid xid) throws Exception;
+ public void commitPreparedTransaction(Xid xid) throws Exception;
- /**
- * Delete a single reference. This would also delete the message if it is no longer referenced by any other
- * references.
- * This would occur on acknowledgement of a single reference
- * @param message
- */
- void deleteReference(MessageReference reference) throws Exception;
/**
- * Delete a list of references from storage, also deleting their corresponding messages if they are no longer
- * referenced.
- * This would occur on acknowledgement of multiple references in a transaction.
- * @param messages
+ * Unprepare a transaction containing messages to add and references to remove
+ * @param xid The Xid of the XA transaction
+ * @param messagesToAdd List of messages to add, or null if none
+ * @param referencesToRemove List of references to remove, or null if none
+ * @throws Exception
*/
- void deleteReferences(List<MessageReference> references) throws Exception;
+ public void unprepareTransaction(Xid xid, List<Message> messagesToAdd,
+ List<MessageReference> referencesToRemove) throws Exception;
+
/**
* Update the delivery count of a reference
* @param queue
Modified: branches/Branch_New_Persistence/src/main/org/jboss/test/concurrent/messaging/newcore/impl/QueueTest.java
===================================================================
--- branches/Branch_New_Persistence/src/main/org/jboss/test/concurrent/messaging/newcore/impl/QueueTest.java 2007-12-10 11:22:13 UTC (rev 3455)
+++ branches/Branch_New_Persistence/src/main/org/jboss/test/concurrent/messaging/newcore/impl/QueueTest.java 2007-12-10 12:53:34 UTC (rev 3456)
@@ -26,6 +26,7 @@
import org.jboss.messaging.newcore.impl.QueueImpl;
import org.jboss.messaging.newcore.intf.HandleStatus;
+import org.jboss.messaging.newcore.intf.Message;
import org.jboss.messaging.newcore.intf.MessageReference;
import org.jboss.messaging.newcore.intf.Queue;
import org.jboss.test.unit.fakes.messaging.newcore.impl.FakeConsumer;
@@ -128,8 +129,10 @@
while (System.currentTimeMillis() - start < testTime)
{
- MessageReference ref = generateReference(queue, i);
+ Message message = generateMessage(i);
+ MessageReference ref = message.createReference(queue);
+
queue.addLast(ref);
refs.add(ref);
Modified: branches/Branch_New_Persistence/src/main/org/jboss/test/unit/messaging/UnitTestCase.java
===================================================================
--- branches/Branch_New_Persistence/src/main/org/jboss/test/unit/messaging/UnitTestCase.java 2007-12-10 11:22:13 UTC (rev 3455)
+++ branches/Branch_New_Persistence/src/main/org/jboss/test/unit/messaging/UnitTestCase.java 2007-12-10 12:53:34 UTC (rev 3456)
@@ -146,9 +146,25 @@
{
Message message = new MessageImpl(id, 0, true, 0, System.currentTimeMillis(), (byte)4);
+ byte[] bytes = new byte[1024];
+
+ for (int i = 0; i < 1024; i++)
+ {
+ bytes[i] = (byte)i;
+ }
+
+ message.setPayload(bytes);
+
return message;
}
+ protected MessageReference generateReference(Queue queue, long id)
+ {
+ Message message = generateMessage(id);
+
+ return message.createReference(queue);
+ }
+
protected void assertEquivalent(Message msg1, Message msg2)
{
assertEquals(msg1.getMessageID(), msg2.getMessageID());
@@ -185,6 +201,8 @@
assertEquals(ref1.getScheduledDeliveryTime(), ref2.getScheduledDeliveryTime());
assertEquals(ref1.getDeliveryCount(), ref2.getDeliveryCount());
+
+ assertEquals(ref1.getQueue(), ref2.getQueue());
}
}
Deleted: branches/Branch_New_Persistence/src/main/org/jboss/test/unit/messaging/newcore/impl/BDBJEPersistenceManagerTest.java
===================================================================
--- branches/Branch_New_Persistence/src/main/org/jboss/test/unit/messaging/newcore/impl/BDBJEPersistenceManagerTest.java 2007-12-10 11:22:13 UTC (rev 3455)
+++ branches/Branch_New_Persistence/src/main/org/jboss/test/unit/messaging/newcore/impl/BDBJEPersistenceManagerTest.java 2007-12-10 12:53:34 UTC (rev 3456)
@@ -1,681 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2005, JBoss Inc., and individual contributors as indicated
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.test.unit.messaging.newcore.impl;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.jboss.messaging.newcore.impl.MessageImpl;
-import org.jboss.messaging.newcore.impl.QueueImpl;
-import org.jboss.messaging.newcore.impl.bdbje.BDBJEDatabase;
-import org.jboss.messaging.newcore.impl.bdbje.BDBJEEnvironment;
-import org.jboss.messaging.newcore.impl.bdbje.BDBJEPersistenceManager;
-import org.jboss.messaging.newcore.intf.Message;
-import org.jboss.messaging.newcore.intf.MessageReference;
-import org.jboss.messaging.newcore.intf.Queue;
-import org.jboss.test.unit.fakes.messaging.newcore.impl.bdbje.FakeBDBJEEnvironment;
-import org.jboss.test.unit.messaging.UnitTestCase;
-
-/**
- *
- * A BDBJEPersistenceManagerTest
- *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- *
- */
-public class BDBJEPersistenceManagerTest extends UnitTestCase
-{
- protected static final String ENV_DIR = "test-env";
-
- protected BDBJEPersistenceManager pm;
-
- protected BDBJEEnvironment bdb;
-
- protected void setUp() throws Exception
- {
- super.setUp();
-
- bdb = new FakeBDBJEEnvironment();
-
- pm = new BDBJEPersistenceManager(bdb, ENV_DIR);
-
- pm.start();
- }
-
- protected void tearDown() throws Exception
- {
- super.tearDown();
-
- pm.stop();
- }
-
- // The tests ----------------------------------------------------------------
-
- public void testCommitMessage() throws Exception
- {
- Message m = generateMessage(1);
-
- Queue queue = new QueueImpl(67);
-
- m.createReference(queue);
-
- m.createReference(queue);
-
- m.createReference(queue);
-
- m.createReference(queue);
-
- pm.commitMessage(m);
-
- BDBJEDatabase msgDB = bdb.getDatabase(BDBJEPersistenceManager.MESSAGE_DB_NAME);
-
- BDBJEDatabase refDB = bdb.getDatabase(BDBJEPersistenceManager.REFERENCE_DB_NAME);
-
- byte[] msgBytes = msgDB.get(m.getMessageID());
-
- assertNotNull(msgBytes);
-
- byte[] refBytes = refDB.get(m.getMessageID());
-
- assertNotNull(refBytes);
-
- Map<Long, Queue> queues = new HashMap<Long, Queue>();
-
- queues.put(queue.getID(), queue);
-
- Message m2 = extractMessage(queues, m.getMessageID(), msgBytes, refBytes);
-
- assertEquivalent(m, m2);
- }
-
- public void testCommitMessages() throws Exception
- {
- List<Message> msgs = new ArrayList<Message>();
-
- final int numMessages = 10;
-
- Queue queue = new QueueImpl(67);
-
- for (int i = 0; i < numMessages; i++)
- {
- Message m = generateMessage(i);
-
- m.createReference(queue);
-
- m.createReference(queue);
-
- m.createReference(queue);
-
- m.createReference(queue);
-
- msgs.add(m);
- }
-
- pm.commitMessages(msgs);
-
- BDBJEDatabase msgDB = bdb.getDatabase(BDBJEPersistenceManager.MESSAGE_DB_NAME);
-
- BDBJEDatabase refDB = bdb.getDatabase(BDBJEPersistenceManager.REFERENCE_DB_NAME);
-
- for (int i = 0; i < numMessages; i++)
- {
- Message m = msgs.get(i);
-
- byte[] msgBytes = msgDB.get(m.getMessageID());
-
- assertNotNull(msgBytes);
-
- byte[] refBytes = refDB.get(m.getMessageID());
-
- assertNotNull(refBytes);
-
- Map<Long, Queue> queues = new HashMap<Long, Queue>();
-
- queues.put(queue.getID(), queue);
-
- Message m2 = extractMessage(queues, m.getMessageID(), msgBytes, refBytes);
-
- assertEquivalent(m, m2);
- }
- }
-
- public void testDeleteReference() throws Exception
- {
- Message m = generateMessage(1);
-
- Queue queue = new QueueImpl(67);
-
- MessageReference ref1 = m.createReference(queue);
-
- MessageReference ref2 = m.createReference(queue);
-
- MessageReference ref3 = m.createReference(queue);
-
- MessageReference ref4 = m.createReference(queue);
-
- pm.commitMessage(m);
-
- BDBJEDatabase msgDB = bdb.getDatabase(BDBJEPersistenceManager.MESSAGE_DB_NAME);
-
- BDBJEDatabase refDB = bdb.getDatabase(BDBJEPersistenceManager.REFERENCE_DB_NAME);
-
- byte[] msgBytes = msgDB.get(m.getMessageID());
-
- assertNotNull(msgBytes);
-
- byte[] refBytes = refDB.get(m.getMessageID());
-
- assertNotNull(refBytes);
-
- Map<Long, Queue> queues = new HashMap<Long, Queue>();
-
- queues.put(queue.getID(), queue);
-
- Message m2 = extractMessage(queues, m.getMessageID(), msgBytes, refBytes);
-
- assertEquivalent(m, m2);
-
- assertEquals(4, m2.getReferences().size());
-
-
- pm.deleteReference(ref2);
-
- msgBytes = msgDB.get(m.getMessageID());
-
- assertNotNull(msgBytes);
-
- refBytes = refDB.get(m.getMessageID());
-
- assertNotNull(refBytes);
-
- queues = new HashMap<Long, Queue>();
-
- queues.put(queue.getID(), queue);
-
- m2 = extractMessage(queues, m.getMessageID(), msgBytes, refBytes);
-
- assertEquivalent(m, m2);
-
- assertEquals(3, m2.getReferences().size());
-
-
- pm.deleteReference(ref1);
-
- msgBytes = msgDB.get(m.getMessageID());
-
- assertNotNull(msgBytes);
-
- refBytes = refDB.get(m.getMessageID());
-
- assertNotNull(refBytes);
-
- queues = new HashMap<Long, Queue>();
-
- queues.put(queue.getID(), queue);
-
- m2 = extractMessage(queues, m.getMessageID(), msgBytes, refBytes);
-
- assertEquivalent(m, m2);
-
- assertEquals(2, m2.getReferences().size());
-
-
- pm.deleteReference(ref3);
-
- msgBytes = msgDB.get(m.getMessageID());
-
- assertNotNull(msgBytes);
-
- refBytes = refDB.get(m.getMessageID());
-
- assertNotNull(refBytes);
-
- queues = new HashMap<Long, Queue>();
-
- queues.put(queue.getID(), queue);
-
- m2 = extractMessage(queues, m.getMessageID(), msgBytes, refBytes);
-
- assertEquivalent(m, m2);
-
- assertEquals(1, m2.getReferences().size());
-
-
- pm.deleteReference(ref4);
-
- msgBytes = msgDB.get(m.getMessageID());
-
- assertNull(msgBytes);
-
- refBytes = refDB.get(m.getMessageID());
-
- assertNull(refBytes);
- }
-
- public void testDeleteReferences() throws Exception
- {
- //pm.deleteReferences(references)
-
- // pm.
- }
-
-
-
-
-//
-// public void testAddMultipleReferencesForSameMessageNoTx() throws Exception
-// {
-// FakeBDBJEIntf fakeBDB = new FakeBDBJEIntf();
-//
-// PersistenceManager pm = new BDBJEPersistenceManager(fakeBDB);
-//
-// MessageReference ref1 = generateReference(1);
-//
-// Queue queue = new QueueImpl(67);
-//
-// pm.addReference(null, queue, ref1);
-//
-// MessageReference ref2 = ref1.getMessage().createReference();
-//
-// pm.addReference(null, queue, ref2);
-//
-// MessageReference ref3 = ref1.getMessage().createReference();
-//
-// pm.addReference(null, queue, ref3);
-//
-// MessageReference ref4 = ref1.getMessage().createReference();
-//
-// pm.addReference(null, queue, ref4);
-//
-// byte[] bytes = fakeBDB.getStore().get(ref1.getMessage().getMessageID());
-//
-// assertNotNull(bytes);
-//
-// Message message = extractMessage(ref1.getMessage().getMessageID(), bytes);
-//
-// assertEquals(4, message.getReferences().size());
-//
-// assertTrue(message.isReliable());
-//
-// assertEquivalent(ref1.getMessage(), message);
-// }
-//
-// public void testRemoveReferenceNoTx() throws Exception
-// {
-// FakeBDBJEIntf fakeBDB = new FakeBDBJEIntf();
-//
-// PersistenceManager pm = new BDBJEPersistenceManager(fakeBDB);
-//
-// MessageReference ref1 = generateReference(1);
-//
-// Queue queue = new QueueImpl(67);
-//
-// pm.addReference(null, queue, ref1);
-//
-// MessageReference ref2 = ref1.getMessage().createReference();
-//
-// pm.addReference(null, queue, ref2);
-//
-// MessageReference ref3 = ref1.getMessage().createReference();
-//
-// pm.addReference(null, queue, ref3);
-//
-// MessageReference ref4 = ref1.getMessage().createReference();
-//
-// pm.addReference(null, queue, ref4);
-//
-// byte[] bytes = fakeBDB.getStore().get(ref1.getMessage().getMessageID());
-//
-// assertNotNull(bytes);
-//
-// Message message = extractMessage(ref1.getMessage().getMessageID(), bytes);
-//
-// assertEquals(4, message.getReferences().size());
-//
-// assertTrue(message.isReliable());
-//
-// assertEquivalent(ref1.getMessage(), message);
-//
-//
-// pm.removeReference(null, queue, ref3);
-//
-// bytes = fakeBDB.getStore().get(ref1.getMessage().getMessageID());
-//
-// assertNotNull(bytes);
-//
-// message = extractMessage(ref1.getMessage().getMessageID(), bytes);
-//
-// assertEquals(3, message.getReferences().size());
-//
-// assertTrue(message.isReliable());
-//
-// assertEquivalent(ref1.getMessage(), message);
-//
-//
-// pm.removeReference(null, queue, ref1);
-//
-// bytes = fakeBDB.getStore().get(ref1.getMessage().getMessageID());
-//
-// assertNotNull(bytes);
-//
-// message = extractMessage(ref1.getMessage().getMessageID(), bytes);
-//
-// assertEquals(2, message.getReferences().size());
-//
-// assertTrue(message.isReliable());
-//
-// assertEquivalent(ref1.getMessage(), message);
-//
-//
-// pm.removeReference(null, queue, ref2);
-//
-// bytes = fakeBDB.getStore().get(ref1.getMessage().getMessageID());
-//
-// assertNotNull(bytes);
-//
-// message = extractMessage(ref1.getMessage().getMessageID(), bytes);
-//
-// assertEquals(1, message.getReferences().size());
-//
-// assertTrue(message.isReliable());
-//
-// assertEquivalent(ref1.getMessage(), message);
-//
-//
-// pm.removeReference(null, queue, ref4);
-//
-// bytes = fakeBDB.getStore().get(ref1.getMessage().getMessageID());
-//
-// assertNull(bytes);
-//
-//
-// assertTrue(fakeBDB.getStore().isEmpty());
-// }
-//
-// public void testUpdateDeliveryCount() throws Exception
-// {
-// FakeBDBJEIntf fakeBDB = new FakeBDBJEIntf();
-//
-// PersistenceManager pm = new BDBJEPersistenceManager(fakeBDB);
-//
-// MessageReference ref1 = generateReference(1);
-//
-// Queue queue = new QueueImpl(67);
-//
-// pm.addReference(null, queue, ref1);
-//
-// MessageReference ref2 = ref1.getMessage().createReference();
-//
-// pm.addReference(null, queue, ref2);
-//
-// MessageReference ref3 = ref1.getMessage().createReference();
-//
-// pm.addReference(null, queue, ref3);
-//
-// MessageReference ref4 = ref1.getMessage().createReference();
-//
-// pm.addReference(null, queue, ref4);
-//
-// ref2.setDeliveryCount(765);
-//
-// pm.updateDeliveryCount(queue, ref2);
-//
-// byte[] bytes = fakeBDB.getStore().get(ref1.getMessage().getMessageID());
-//
-// assertNotNull(bytes);
-//
-// Message message = extractMessage(ref1.getMessage().getMessageID(), bytes);
-//
-// assertEquals(4, message.getReferences().size());
-//
-// assertTrue(message.isReliable());
-//
-// assertEquivalent(ref1.getMessage(), message);
-//
-// assertEquals(765, ref2.getDeliveryCount());
-//
-// assertEquals(ref2.getDeliveryCount(), message.getReferences().get(1).getDeliveryCount());
-//
-// ref4.setDeliveryCount(10101);
-//
-// pm.updateDeliveryCount(queue, ref4);
-//
-// bytes = fakeBDB.getStore().get(ref1.getMessage().getMessageID());
-//
-// assertNotNull(bytes);
-//
-// message = extractMessage(ref1.getMessage().getMessageID(), bytes);
-//
-// assertEquals(4, message.getReferences().size());
-//
-// assertTrue(message.isReliable());
-//
-// assertEquivalent(ref1.getMessage(), message);
-//
-// assertEquals(10101, ref4.getDeliveryCount());
-//
-// assertEquals(ref4.getDeliveryCount(), message.getReferences().get(3).getDeliveryCount());
-//
-// assertEquals(ref1.getDeliveryCount(), message.getReferences().get(0).getDeliveryCount());
-//
-// assertEquals(ref3.getDeliveryCount(), message.getReferences().get(2).getDeliveryCount());
-// }
-//
-// public void testAddReferenceTxCommit() throws Exception
-// {
-// FakeBDBJEIntf fakeBDB = new FakeBDBJEIntf();
-//
-// PersistenceManager pm = new BDBJEPersistenceManager(fakeBDB);
-//
-// MessageReference ref1 = generateReference(1);
-//
-// Queue queue = new QueueImpl(67);
-//
-// PersistenceTransaction tx = pm.createTransaction();
-//
-// pm.addReference(tx, queue, ref1);
-//
-// byte[] bytes = fakeBDB.getStore().get(ref1.getMessage().getMessageID());
-//
-// assertNull(bytes);
-//
-// tx.commit();
-//
-// bytes = fakeBDB.getStore().get(ref1.getMessage().getMessageID());
-//
-// assertNotNull(bytes);
-//
-// Message message = extractMessage(ref1.getMessage().getMessageID(), bytes);
-//
-// assertTrue(message.isReliable());
-//
-// assertEquivalent(ref1.getMessage(), message);
-// }
-//
-// public void testAddReferenceTxRollback() throws Exception
-// {
-// FakeBDBJEIntf fakeBDB = new FakeBDBJEIntf();
-//
-// PersistenceManager pm = new BDBJEPersistenceManager(fakeBDB);
-//
-// MessageReference ref1 = generateReference(1);
-//
-// Queue queue = new QueueImpl(67);
-//
-// PersistenceTransaction tx = pm.createTransaction();
-//
-// pm.addReference(tx, queue, ref1);
-//
-// byte[] bytes = fakeBDB.getStore().get(ref1.getMessage().getMessageID());
-//
-// assertNull(bytes);
-//
-// tx.rollback();
-//
-// bytes = fakeBDB.getStore().get(ref1.getMessage().getMessageID());
-//
-// assertNull(bytes);
-// }
-//
-// /*
-// test adding different refs for different queues in same tx
-//
-// plus removing refs for different queues in same tx
-//
-// test loading queues (multiple quuees)
-//
-// test loading queues with different queues holding refs for same message (like subs)
-// */
-//
-// public void testLoadQueues() throws Exception
-// {
-// Queue queue1 = new QueueImpl(1);
-//
-// Queue queue2 = new QueueImpl(2);
-//
-// Queue queue3 = new QueueImpl(3);
-//
-// FakeBDBJEIntf fakeBDB = new FakeBDBJEIntf();
-//
-// PersistenceManager pm = new BDBJEPersistenceManager(fakeBDB);
-//
-// MessageReference ref1 = generateReference(1);
-// MessageReference ref2 = generateReference(2);
-// MessageReference ref3 = generateReference(3);
-// MessageReference ref4 = generateReference(4);
-// MessageReference ref5 = generateReference(5);
-// MessageReference ref6 = generateReference(6);
-// MessageReference ref7 = generateReference(7);
-// MessageReference ref8 = generateReference(8);
-// MessageReference ref9 = generateReference(9);
-// MessageReference ref10 = generateReference(10);
-//
-// pm.addReference(null, queue1, ref1);
-// pm.addReference(null, queue1, ref2);
-// pm.addReference(null, queue1, ref3);
-//
-// pm.addReference(null, queue2, ref4);
-// pm.addReference(null, queue2, ref5);
-// pm.addReference(null, queue2, ref6);
-//
-// pm.addReference(null, queue3, ref7);
-// pm.addReference(null, queue3, ref8);
-// pm.addReference(null, queue3, ref9);
-// pm.addReference(null, queue3, ref10);
-//
-// assertEquals(10, fakeBDB.getStore().size());
-//
-// Map<Long, Queue> map = new HashMap<Long, Queue>();
-//
-// map.put(queue1.getID(), queue1);
-// map.put(queue2.getID(), queue2);
-// map.put(queue3.getID(), queue3);
-//
-// pm.loadQueues(map);
-//
-// assertEquals(3, queue1.getMessageCount());
-// assertEquals(3, queue2.getMessageCount());
-// assertEquals(4, queue3.getMessageCount());
-//
-// List<MessageReference> refs = new ArrayList<MessageReference>();
-//
-// refs.add(ref1);
-// refs.add(ref2);
-// refs.add(ref3);
-// assertRefListsEquivalent(queue1.list(null), refs);
-//
-// refs.clear();
-// refs.add(ref4);
-// refs.add(ref5);
-// refs.add(ref6);
-// assertRefListsEquivalent(queue2.list(null), refs);
-//
-// refs.clear();
-// refs.add(ref7);
-// refs.add(ref8);
-// refs.add(ref9);
-// refs.add(ref10);
-// assertRefListsEquivalent(queue3.list(null), refs);
-//
-// }
-//
-// // Private --------------------------------------------------------------------
-
- private Message extractMessage(Map<Long, Queue> queues, long id, byte[] msgBytes, byte[] refBytes) throws Exception
- {
- ByteBuffer buffer = ByteBuffer.wrap(msgBytes);
-
- int type = buffer.getInt();
-
- long expiration = buffer.getLong();
-
- long timestamp = buffer.getLong();
-
- byte priority = buffer.get();
-
- int headerSize = buffer.getInt();
-
- byte[] headers = new byte[headerSize];
-
- buffer.get(headers);
-
- int payloadSize = buffer.getInt();
-
- byte[] payload = null;
-
- if (payloadSize != 0)
- {
- payload = new byte[payloadSize];
-
- buffer.get(payload);
- }
-
- Message message = new MessageImpl(id, type, true, expiration, timestamp, priority,
- headers, payload);
-
- buffer = ByteBuffer.wrap(refBytes);
-
- while (buffer.hasRemaining())
- {
- long queueID = buffer.getLong();
-
- int deliveryCount = buffer.getInt();
-
- long scheduledDeliveryTime = buffer.getLong();
-
- MessageReference reference = message.createReference(queues.get(queueID));
-
- reference.setDeliveryCount(deliveryCount);
-
- reference.setScheduledDeliveryTime(scheduledDeliveryTime);
- }
-
- return message;
- }
-
-
- // Inner classes ---------------------------------------------------------------
-
-}
Modified: branches/Branch_New_Persistence/src/main/org/jboss/test/unit/messaging/newcore/impl/MessageTest.java
===================================================================
--- branches/Branch_New_Persistence/src/main/org/jboss/test/unit/messaging/newcore/impl/MessageTest.java 2007-12-10 11:22:13 UTC (rev 3455)
+++ branches/Branch_New_Persistence/src/main/org/jboss/test/unit/messaging/newcore/impl/MessageTest.java 2007-12-10 12:53:34 UTC (rev 3456)
@@ -35,7 +35,7 @@
*
* Tests for Message and MessageReference
*
- * TODO - Test header and payload, streaming and destreaming
+ * TODO - Test streaming and destreaming
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
*
@@ -146,6 +146,18 @@
assertEquals(connectionID, message.getConnectionID());
}
+ public void testSetAndGetPayload()
+ {
+ Message message = new MessageImpl();
+
+ assertNull(message.getPayload());
+
+ byte[] bytes = "blah blah blah".getBytes();
+ message.setPayload(bytes);
+
+ assertByteArraysEquivalent(bytes, message.getPayload());
+ }
+
public void testHeaders()
{
Message message = new MessageImpl();
Added: branches/Branch_New_Persistence/src/main/org/jboss/test/unit/messaging/newcore/impl/TransactionTest.java
===================================================================
--- branches/Branch_New_Persistence/src/main/org/jboss/test/unit/messaging/newcore/impl/TransactionTest.java (rev 0)
+++ branches/Branch_New_Persistence/src/main/org/jboss/test/unit/messaging/newcore/impl/TransactionTest.java 2007-12-10 12:53:34 UTC (rev 3456)
@@ -0,0 +1,326 @@
+package org.jboss.test.unit.messaging.newcore.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.transaction.xa.Xid;
+
+import org.easymock.EasyMock;
+import org.jboss.messaging.newcore.impl.QueueImpl;
+import org.jboss.messaging.newcore.impl.TransactionImpl;
+import org.jboss.messaging.newcore.intf.Message;
+import org.jboss.messaging.newcore.intf.MessageReference;
+import org.jboss.messaging.newcore.intf.PersistenceManager;
+import org.jboss.messaging.newcore.intf.Queue;
+import org.jboss.messaging.newcore.intf.Transaction;
+import org.jboss.messaging.newcore.intf.TransactionSynchronization;
+import org.jboss.test.unit.messaging.UnitTestCase;
+
+/**
+ *
+ * A TransactionTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class TransactionTest extends UnitTestCase
+{
+
+ public void test1PCCommit() throws Exception
+ {
+ List<Message> msgsToAdd = new ArrayList<Message>();
+
+ List<MessageReference> refsToRemove = new ArrayList<MessageReference>();
+
+ Queue queue = new QueueImpl(1);
+
+ MessageReference ref1 = this.generateReference(queue, 1);
+ msgsToAdd.add(ref1.getMessage());
+
+ MessageReference ref2 = this.generateReference(queue, 2);
+ refsToRemove.add(ref2);
+
+ Transaction tx = new TransactionImpl(msgsToAdd, refsToRemove, true);
+
+ PersistenceManager pm = EasyMock.createStrictMock(PersistenceManager.class);
+
+ pm.commitTransaction(msgsToAdd, refsToRemove);
+
+ EasyMock.replay(pm);
+
+ tx.commit(pm);
+
+ EasyMock.verify(pm);
+
+ assertEquals(ref1, queue.list(null).get(0));
+ }
+
+ public void test1PCRollback() throws Exception
+ {
+ List<Message> msgsToAdd = new ArrayList<Message>();
+
+ List<MessageReference> refsToRemove = new ArrayList<MessageReference>();
+
+ Queue queue = new QueueImpl(1);
+
+ MessageReference ref1 = this.generateReference(queue, 1);
+ msgsToAdd.add(ref1.getMessage());
+
+ MessageReference ref2 = this.generateReference(queue, 2);
+ refsToRemove.add(ref2);
+
+ Transaction tx = new TransactionImpl(msgsToAdd, refsToRemove, true);
+
+ PersistenceManager pm = EasyMock.createStrictMock(PersistenceManager.class);
+
+ EasyMock.replay(pm);
+
+ tx.rollback(pm);
+
+ EasyMock.verify(pm);
+
+ assertTrue(queue.list(null).isEmpty());
+ }
+
+ public void test1PCPrepare() throws Exception
+ {
+ List<Message> msgsToAdd = new ArrayList<Message>();
+
+ List<MessageReference> refsToRemove = new ArrayList<MessageReference>();
+
+ Queue queue = new QueueImpl(1);
+
+ MessageReference ref1 = this.generateReference(queue, 1);
+ msgsToAdd.add(ref1.getMessage());
+
+ MessageReference ref2 = this.generateReference(queue, 2);
+ refsToRemove.add(ref2);
+
+ Transaction tx = new TransactionImpl(msgsToAdd, refsToRemove, true);
+
+ PersistenceManager pm = EasyMock.createStrictMock(PersistenceManager.class);
+
+ try
+ {
+ tx.prepare(pm);
+ fail("Should throw exception");
+ }
+ catch (IllegalStateException e)
+ {
+ //OK
+ }
+
+ assertTrue(queue.list(null).isEmpty());
+ }
+
+ public void test2PCPrepareCommit() throws Exception
+ {
+ List<Message> msgsToAdd = new ArrayList<Message>();
+
+ List<MessageReference> refsToRemove = new ArrayList<MessageReference>();
+
+ Queue queue = new QueueImpl(1);
+
+ MessageReference ref1 = this.generateReference(queue, 1);
+ msgsToAdd.add(ref1.getMessage());
+
+ MessageReference ref2 = this.generateReference(queue, 2);
+ refsToRemove.add(ref2);
+
+ Xid xid = generateXid();
+
+ Transaction tx = new TransactionImpl(xid, msgsToAdd, refsToRemove, true);
+
+ PersistenceManager pm = EasyMock.createStrictMock(PersistenceManager.class);
+
+ pm.prepareTransaction(xid, msgsToAdd, refsToRemove);
+
+ EasyMock.replay(pm);
+
+ tx.prepare(pm);
+
+ EasyMock.verify(pm);
+
+ EasyMock.reset(pm);
+
+ pm.commitPreparedTransaction(xid);
+
+ EasyMock.replay(pm);
+
+ tx.commit(pm);
+
+ EasyMock.verify(pm);
+ }
+
+ public void test2PCCommitBeforePrepare() throws Exception
+ {
+ List<Message> msgsToAdd = new ArrayList<Message>();
+
+ List<MessageReference> refsToRemove = new ArrayList<MessageReference>();
+
+ Queue queue = new QueueImpl(1);
+
+ MessageReference ref1 = this.generateReference(queue, 1);
+ msgsToAdd.add(ref1.getMessage());
+
+ MessageReference ref2 = this.generateReference(queue, 2);
+ refsToRemove.add(ref2);
+
+ Xid xid = generateXid();
+
+ Transaction tx = new TransactionImpl(xid, msgsToAdd, refsToRemove, true);
+
+ PersistenceManager pm = EasyMock.createStrictMock(PersistenceManager.class);
+
+ try
+ {
+ tx.commit(pm);
+
+ fail ("Should throw exception");
+ }
+ catch (IllegalStateException e)
+ {
+ //Ok
+ }
+ }
+
+ public void test2PCPrepareRollback() throws Exception
+ {
+ List<Message> msgsToAdd = new ArrayList<Message>();
+
+ List<MessageReference> refsToRemove = new ArrayList<MessageReference>();
+
+ Queue queue = new QueueImpl(1);
+
+ MessageReference ref1 = this.generateReference(queue, 1);
+ msgsToAdd.add(ref1.getMessage());
+
+ MessageReference ref2 = this.generateReference(queue, 2);
+ refsToRemove.add(ref2);
+
+ Xid xid = generateXid();
+
+ Transaction tx = new TransactionImpl(xid, msgsToAdd, refsToRemove, true);
+
+ PersistenceManager pm = EasyMock.createStrictMock(PersistenceManager.class);
+
+ pm.prepareTransaction(xid, msgsToAdd, refsToRemove);
+
+ EasyMock.replay(pm);
+
+ tx.prepare(pm);
+
+ EasyMock.verify(pm);
+
+ EasyMock.reset(pm);
+
+ pm.unprepareTransaction(xid, msgsToAdd, refsToRemove);
+
+ EasyMock.replay(pm);
+
+ tx.rollback(pm);
+
+ EasyMock.verify(pm);
+ }
+
+ public void testSynchronizations() throws Exception
+ {
+ List<Message> msgsToAdd = new ArrayList<Message>();
+
+ List<MessageReference> refsToRemove = new ArrayList<MessageReference>();
+
+ Queue queue = new QueueImpl(1);
+
+ MessageReference ref1 = this.generateReference(queue, 1);
+ msgsToAdd.add(ref1.getMessage());
+
+ MessageReference ref2 = this.generateReference(queue, 2);
+ refsToRemove.add(ref2);
+
+ Transaction tx = new TransactionImpl(msgsToAdd, refsToRemove, true);
+
+ TransactionSynchronization sync = EasyMock.createStrictMock(TransactionSynchronization.class);
+
+ PersistenceManager pm = EasyMock.createStrictMock(PersistenceManager.class);
+
+ tx.addSynchronization(sync);
+
+ sync.beforeCommit();
+ sync.afterCommit();
+
+ EasyMock.replay(sync);
+
+ tx.commit(pm);
+
+ EasyMock.verify(sync);
+
+ EasyMock.reset(sync);
+
+ tx = new TransactionImpl(msgsToAdd, refsToRemove, true);
+
+ tx.addSynchronization(sync);
+
+ sync.beforeRollback();
+ sync.afterRollback();
+
+ EasyMock.replay(sync);
+
+ tx.rollback(pm);
+
+ EasyMock.verify(sync);
+ }
+
+ public void testSynchronizations2PC() throws Exception
+ {
+ List<Message> msgsToAdd = new ArrayList<Message>();
+
+ List<MessageReference> refsToRemove = new ArrayList<MessageReference>();
+
+ Queue queue = new QueueImpl(1);
+
+ MessageReference ref1 = this.generateReference(queue, 1);
+ msgsToAdd.add(ref1.getMessage());
+
+ MessageReference ref2 = this.generateReference(queue, 2);
+ refsToRemove.add(ref2);
+
+ Xid xid = generateXid();
+
+ Transaction tx = new TransactionImpl(xid, msgsToAdd, refsToRemove, true);
+
+ TransactionSynchronization sync = EasyMock.createStrictMock(TransactionSynchronization.class);
+
+ PersistenceManager pm = EasyMock.createStrictMock(PersistenceManager.class);
+
+ tx.addSynchronization(sync);
+
+ sync.beforeCommit();
+ sync.afterCommit();
+
+ EasyMock.replay(sync);
+
+ tx.prepare(pm);
+ tx.commit(pm);
+
+ EasyMock.verify(sync);
+
+ EasyMock.reset(sync);
+
+ xid = generateXid();
+
+ tx = new TransactionImpl(xid, msgsToAdd, refsToRemove, true);
+
+ tx.addSynchronization(sync);
+
+ sync.beforeRollback();
+ sync.afterRollback();
+
+ EasyMock.replay(sync);
+
+ tx.prepare(pm);
+ tx.rollback(pm);
+
+ EasyMock.verify(sync);
+ }
+
+}
Copied: branches/Branch_New_Persistence/src/main/org/jboss/test/unit/messaging/newcore/impl/bdbje/BDBJEPersistenceManagerTest.java (from rev 3414, branches/Branch_New_Persistence/src/main/org/jboss/test/unit/messaging/newcore/impl/BDBJEPersistenceManagerTest.java)
===================================================================
--- branches/Branch_New_Persistence/src/main/org/jboss/test/unit/messaging/newcore/impl/bdbje/BDBJEPersistenceManagerTest.java (rev 0)
+++ branches/Branch_New_Persistence/src/main/org/jboss/test/unit/messaging/newcore/impl/bdbje/BDBJEPersistenceManagerTest.java 2007-12-10 12:53:34 UTC (rev 3456)
@@ -0,0 +1,786 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.test.unit.messaging.newcore.impl.bdbje;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.transaction.xa.Xid;
+
+import org.jboss.messaging.newcore.impl.MessageImpl;
+import org.jboss.messaging.newcore.impl.QueueImpl;
+import org.jboss.messaging.newcore.impl.bdbje.BDBJEDatabase;
+import org.jboss.messaging.newcore.impl.bdbje.BDBJEEnvironment;
+import org.jboss.messaging.newcore.impl.bdbje.BDBJEPersistenceManager;
+import org.jboss.messaging.newcore.intf.Message;
+import org.jboss.messaging.newcore.intf.MessageReference;
+import org.jboss.messaging.newcore.intf.Queue;
+import org.jboss.test.unit.fakes.messaging.newcore.impl.bdbje.FakeBDBJEEnvironment;
+import org.jboss.test.unit.messaging.UnitTestCase;
+
+/**
+ *
+ * A BDBJEPersistenceManagerTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ *
+ */
+public class BDBJEPersistenceManagerTest extends UnitTestCase
+{
+ protected static final String ENV_DIR = "test-env";
+
+ protected BDBJEPersistenceManager pm;
+
+ protected BDBJEEnvironment bdb;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+
+ bdb = new FakeBDBJEEnvironment();
+
+ pm = new BDBJEPersistenceManager(bdb, ENV_DIR);
+
+ pm.start();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+
+ pm.stop();
+ }
+
+ // The tests ----------------------------------------------------------------
+
+ public void testAddMessage() throws Exception
+ {
+ Queue queue = new QueueImpl(67);
+
+ Message m = createMessageWithRefs(1, queue);
+
+ pm.addMessage(m);
+
+ assertMessageInStore(m, queue);
+ }
+
+ public void testDeleteReference() throws Exception
+ {
+ Queue queue = new QueueImpl(67);
+
+ Message m = createMessageWithRefs(1, queue);
+
+ List<MessageReference> refs = new ArrayList<MessageReference>(m.getReferences());
+
+
+ pm.addMessage(m);
+
+ assertMessageInStore(m, queue);
+
+
+ pm.deleteReference(refs.get(2));
+
+ assertMessageInStore(m, queue);
+
+ assertEquals(3, m.getReferences().size());
+
+ assertTrue(m.getReferences().contains(refs.get(0)));
+ assertTrue(m.getReferences().contains(refs.get(1)));
+ assertTrue(m.getReferences().contains(refs.get(3)));
+
+
+ pm.deleteReference(refs.get(1));
+
+ assertMessageInStore(m, queue);
+
+ assertEquals(2, m.getReferences().size());
+
+ assertTrue(m.getReferences().contains(refs.get(0)));
+ assertTrue(m.getReferences().contains(refs.get(3)));
+
+
+
+ pm.deleteReference(refs.get(3));
+
+ assertMessageInStore(m, queue);
+
+ assertEquals(1, m.getReferences().size());
+
+ assertTrue(m.getReferences().contains(refs.get(0)));
+
+
+ pm.deleteReference(refs.get(0));
+
+ assertMessageNotInStore(m);
+
+ assertStoreEmpty();
+ }
+
+ public void testCommitTransaction() throws Exception
+ {
+ List<Message> msgs = new ArrayList<Message>();
+
+ Queue queue = new QueueImpl(67);
+
+ Message m1 = createMessageWithRefs(1, queue);
+ List<MessageReference> m1Refs = new ArrayList<MessageReference>(m1.getReferences());
+
+ msgs.add(m1);
+
+ Message m2 = createMessageWithRefs(2, queue);
+
+ msgs.add(m2);
+
+ Message m3 = createMessageWithRefs(3, queue);
+ List<MessageReference> m3Refs = new ArrayList<MessageReference>(m3.getReferences());
+
+ msgs.add(m3);
+
+ pm.commitTransaction(msgs, null);
+
+ assertMessageInStore(m1, queue);
+
+ assertMessageInStore(m2, queue);
+
+ assertMessageInStore(m3, queue);
+
+
+ //Add a couple more
+
+ List<Message> msgsMore = new ArrayList<Message>();
+
+ Message m4 = createMessageWithRefs(4, queue);
+ msgsMore.add(m4);
+
+ Message m5 = createMessageWithRefs(5, queue);
+ msgsMore.add(m5);
+
+ //Delete some refs
+
+ List<MessageReference> refsToRemove = new ArrayList<MessageReference>();
+
+ refsToRemove.add(m1.getReferences().get(0));
+ refsToRemove.add(m1.getReferences().get(3));
+
+ refsToRemove.add(m2.getReferences().get(0));
+ refsToRemove.add(m2.getReferences().get(1));
+ refsToRemove.add(m2.getReferences().get(2));
+ refsToRemove.add(m2.getReferences().get(3));
+
+ refsToRemove.add(m3.getReferences().get(2));
+
+ pm.commitTransaction(msgsMore, refsToRemove);
+
+ assertMessageInStore(m1, queue);
+ assertEquals(2, m1.getReferences().size());
+ assertTrue(m1.getReferences().contains(m1Refs.get(1)));
+ assertTrue(m1.getReferences().contains(m1Refs.get(2)));
+
+ assertMessageNotInStore(m2);
+
+ assertMessageInStore(m3, queue);
+ assertEquals(3, m3.getReferences().size());
+ assertTrue(m3.getReferences().contains(m3Refs.get(0)));
+ assertTrue(m3.getReferences().contains(m3Refs.get(1)));
+ assertTrue(m3.getReferences().contains(m3Refs.get(3)));
+
+ assertMessageInStore(m4, queue);
+ assertEquals(4, m4.getReferences().size());
+
+ assertMessageInStore(m5, queue);
+ assertEquals(4, m5.getReferences().size());
+
+ //Delete the rest
+ refsToRemove.clear();
+ refsToRemove.addAll(m1.getReferences());
+ refsToRemove.addAll(m3.getReferences());
+ refsToRemove.addAll(m4.getReferences());
+ refsToRemove.addAll(m5.getReferences());
+
+ pm.commitTransaction(null, refsToRemove);
+
+ assertMessageNotInStore(m1);
+ assertMessageNotInStore(m2);
+ assertMessageNotInStore(m4);
+ assertMessageNotInStore(m5);
+ assertMessageNotInStore(m5);
+
+ //try with nulls
+ pm.commitTransaction(null, null);
+
+ }
+
+ public void testPrepareAndCommitTransaction() throws Exception
+ {
+ List<Message> msgs = new ArrayList<Message>();
+
+ Queue queue = new QueueImpl(67);
+
+ Message m1 = createMessageWithRefs(1, queue);
+ List<MessageReference> m1Refs = new ArrayList<MessageReference>(m1.getReferences());
+
+ msgs.add(m1);
+
+ Message m2 = createMessageWithRefs(2, queue);
+
+ msgs.add(m2);
+
+ Message m3 = createMessageWithRefs(3, queue);
+ List<MessageReference> m3Refs = new ArrayList<MessageReference>(m3.getReferences());
+
+ msgs.add(m3);
+
+ pm.commitTransaction(msgs, null);
+
+ assertMessageInStore(m1, queue);
+
+ assertMessageInStore(m2, queue);
+
+ assertMessageInStore(m3, queue);
+
+
+ //Add a couple more
+
+ List<Message> msgsMore = new ArrayList<Message>();
+
+ Message m4 = createMessageWithRefs(4, queue);
+ msgsMore.add(m4);
+
+ Message m5 = createMessageWithRefs(5, queue);
+
+ msgsMore.add(m5);
+
+ //Delete some refs
+
+ List<MessageReference> refsToRemove = new ArrayList<MessageReference>();
+
+ refsToRemove.add(m1.getReferences().get(0));
+ refsToRemove.add(m1.getReferences().get(3));
+
+ refsToRemove.add(m2.getReferences().get(0));
+ refsToRemove.add(m2.getReferences().get(1));
+ refsToRemove.add(m2.getReferences().get(2));
+ refsToRemove.add(m2.getReferences().get(3));
+
+ refsToRemove.add(m3.getReferences().get(2));
+
+ Xid xid = generateXid();
+
+ pm.prepareTransaction(xid, msgsMore, refsToRemove);
+
+ pm.commitPreparedTransaction(xid);
+
+ assertMessageInStore(m1, queue);
+ assertEquals(2, m1.getReferences().size());
+ assertTrue(m1.getReferences().contains(m1Refs.get(1)));
+ assertTrue(m1.getReferences().contains(m1Refs.get(2)));
+
+ assertMessageNotInStore(m2);
+
+ assertMessageInStore(m3, queue);
+ assertEquals(3, m3.getReferences().size());
+ assertTrue(m3.getReferences().contains(m3Refs.get(0)));
+ assertTrue(m3.getReferences().contains(m3Refs.get(1)));
+ assertTrue(m3.getReferences().contains(m3Refs.get(3)));
+
+ assertMessageInStore(m4, queue);
+ assertEquals(4, m4.getReferences().size());
+
+ assertMessageInStore(m5, queue);
+ assertEquals(4, m5.getReferences().size());
+
+ //Delete the rest
+ refsToRemove.clear();
+ refsToRemove.addAll(m1.getReferences());
+ refsToRemove.addAll(m3.getReferences());
+ refsToRemove.addAll(m4.getReferences());
+ refsToRemove.addAll(m5.getReferences());
+
+ xid = generateXid();
+
+ pm.prepareTransaction(xid, null, refsToRemove);
+
+ pm.commitPreparedTransaction(xid);
+
+ assertMessageNotInStore(m1);
+ assertMessageNotInStore(m2);
+ assertMessageNotInStore(m4);
+ assertMessageNotInStore(m5);
+ assertMessageNotInStore(m5);
+
+ //try with nulls
+ xid = generateXid();
+ pm.prepareTransaction(xid, null, null);
+ pm.commitPreparedTransaction(xid);
+
+ }
+
+ public void testPrepareAndUnprepareTransaction() throws Exception
+ {
+ List<Message> msgs = new ArrayList<Message>();
+
+ Queue queue = new QueueImpl(67);
+
+ Message m1 = createMessageWithRefs(1, queue);
+
+ msgs.add(m1);
+
+ Message m2 = createMessageWithRefs(2, queue);
+
+ msgs.add(m2);
+
+ Message m3 = createMessageWithRefs(3, queue);
+
+ msgs.add(m3);
+
+ pm.commitTransaction(msgs, null);
+
+ assertMessageInStore(m1, queue);
+
+ assertMessageInStore(m2, queue);
+
+ assertMessageInStore(m3, queue);
+
+
+ //Add a couple more
+
+ List<Message> msgsMore = new ArrayList<Message>();
+
+ Message m4 = createMessageWithRefs(4, queue);
+ msgsMore.add(m4);
+
+ Message m5 = createMessageWithRefs(5, queue);
+ msgsMore.add(m5);
+
+ //Delete some refs
+
+ List<MessageReference> refsToRemove = new ArrayList<MessageReference>();
+
+ refsToRemove.add(m1.getReferences().get(0));
+ refsToRemove.add(m1.getReferences().get(3));
+
+ refsToRemove.add(m2.getReferences().get(0));
+ refsToRemove.add(m2.getReferences().get(1));
+ refsToRemove.add(m2.getReferences().get(2));
+ refsToRemove.add(m2.getReferences().get(3));
+
+ refsToRemove.add(m3.getReferences().get(2));
+
+ Xid xid = generateXid();
+
+ pm.prepareTransaction(xid, msgsMore, refsToRemove);
+
+ pm.unprepareTransaction(xid, msgsMore, refsToRemove);
+
+ assertNumMessagesInStore(3);
+ }
+
+ public void testUpdateDeliveryCount() throws Exception
+ {
+ Queue queue = new QueueImpl(67);
+
+ Message m1 = createMessageWithRefs(1, queue);
+
+ assertEquals(0, m1.getReferences().get(0).getDeliveryCount());
+ assertEquals(0, m1.getReferences().get(1).getDeliveryCount());
+ assertEquals(0, m1.getReferences().get(2).getDeliveryCount());
+ assertEquals(0, m1.getReferences().get(3).getDeliveryCount());
+
+ pm.addMessage(m1);
+
+ final int delCount = 77;
+ m1.getReferences().get(1).setDeliveryCount(delCount);
+ pm.updateDeliveryCount(queue, m1.getReferences().get(1));
+
+ final int delCount2 = 423;
+
+ m1.getReferences().get(3).setDeliveryCount(delCount2);
+ pm.updateDeliveryCount(queue, m1.getReferences().get(3));
+
+ assertMessageInStore(m1, queue);
+ }
+
+ public void testRefsWithDifferentQueues() throws Exception
+ {
+ final int numQueues = 10;
+
+ List<Message> msgs = new ArrayList<Message>();
+
+ for (int i = 0; i < numQueues; i++)
+ {
+ Queue queue = new QueueImpl(i);
+
+ MessageReference ref = generateReference(queue, i);
+
+ msgs.add(ref.getMessage());
+
+ pm.addMessage(ref.getMessage());
+
+ assertEquals(queue, ref.getQueue());
+ }
+
+ for (Message msg: msgs)
+ {
+ assertMessageInStore(msg, msg.getReferences().get(0).getQueue());
+ }
+ }
+
+ public void testLoadQueues() throws Exception
+ {
+ Map<Long, Queue> queues = new HashMap<Long, Queue>();
+
+ final int numQueues = 10;
+
+ final int numMessages = 10;
+
+ for (int i = 0; i < numQueues; i++)
+ {
+ Queue queue = new QueueImpl(i);
+
+ queues.put(queue.getID(), queue);
+ }
+
+ List<Message> msgs = new ArrayList<Message>();
+
+ for (int i = 0; i < numMessages; i++)
+ {
+ Message msg = this.generateMessage(i);
+
+ msgs.add(msg);
+
+ for (long j = 0; j < numQueues; j++)
+ {
+ Queue queue = queues.get(j);
+
+ msg.createReference(queue);
+ }
+
+ pm.addMessage(msg);
+ }
+
+
+ pm.loadQueues(queues);
+
+ for (Queue queue: queues.values())
+ {
+ assertEquals(numMessages, queue.getMessageCount());
+
+ List<MessageReference> refs = queue.list(null);
+
+ int i = 0;
+ for (MessageReference ref: refs)
+ {
+ this.assertEquivalent(msgs.get(i++), ref.getMessage());
+ }
+ }
+ }
+
+ public void testGetInDoubtXids() throws Exception
+ {
+ Queue queue = new QueueImpl(12);
+
+ Message message1 = createMessageWithRefs(1, queue);
+
+ List<Message> msgs = new ArrayList<Message>();
+
+ msgs.add(message1);
+
+ Xid xid1 = generateXid();
+
+ pm.prepareTransaction(xid1, msgs, null);
+
+ pm.setInRecoveryMode(true);
+
+ List<Xid> xids = pm.getInDoubtXids();
+
+ assertNotNull(xids);
+
+ assertEquals(1, xids.size());
+
+ assertEquals(xid1, xids.get(0));
+
+
+
+ Message message2 = createMessageWithRefs(2, queue);
+
+ msgs.clear();
+
+ msgs.add(message2);
+
+ Xid xid2 = generateXid();
+
+ pm.prepareTransaction(xid2, msgs, null);
+
+ xids = pm.getInDoubtXids();
+
+ assertNotNull(xids);
+
+ assertEquals(2, xids.size());
+
+ assertTrue(xids.contains(xid1));
+
+ assertTrue(xids.contains(xid2));
+
+
+ pm.commitPreparedTransaction(xid1);
+
+ pm.commitPreparedTransaction(xid2);
+
+ xids = pm.getInDoubtXids();
+
+ assertNotNull(xids);
+
+ assertEquals(0, xids.size());
+ }
+
+ public void testGetInDoubtXidsWithRestart() throws Exception
+ {
+ Queue queue = new QueueImpl(12);
+
+ Message message1 = createMessageWithRefs(1, queue);
+
+ List<Message> msgs = new ArrayList<Message>();
+
+ msgs.add(message1);
+
+ Xid xid1 = generateXid();
+
+ pm.prepareTransaction(xid1, msgs, null);
+
+ pm.setInRecoveryMode(true);
+
+ List<Xid> xids = pm.getInDoubtXids();
+
+ assertNotNull(xids);
+
+ assertEquals(1, xids.size());
+
+ assertEquals(xid1, xids.get(0));
+
+
+
+ Message message2 = createMessageWithRefs(2, queue);
+
+ msgs.clear();
+
+ msgs.add(message2);
+
+ Xid xid2 = generateXid();
+
+ pm.prepareTransaction(xid2, msgs, null);
+
+ xids = pm.getInDoubtXids();
+
+ assertNotNull(xids);
+
+ assertEquals(2, xids.size());
+
+ assertTrue(xids.contains(xid1));
+
+ assertTrue(xids.contains(xid2));
+
+ pm.stop();
+
+ pm.start();
+
+ pm.setInRecoveryMode(true);
+
+ xids = pm.getInDoubtXids();
+
+ assertNotNull(xids);
+
+ assertEquals(2, xids.size());
+
+ assertTrue(xids.contains(xid1));
+
+ assertTrue(xids.contains(xid2));
+
+
+ pm.commitPreparedTransaction(xid1);
+
+ pm.commitPreparedTransaction(xid2);
+
+ xids = pm.getInDoubtXids();
+
+ assertNotNull(xids);
+
+ assertEquals(0, xids.size());
+ }
+
+ public void testSetGetRecoveryMode() throws Exception
+ {
+ assertFalse(pm.isInRecoveryMode());
+
+ try
+ {
+ pm.getInDoubtXids();
+ fail("Should throw exception");
+ }
+ catch (IllegalStateException e)
+ {
+ //Ok
+ }
+
+ pm.setInRecoveryMode(true);
+
+ assertTrue(pm.isInRecoveryMode());
+
+ pm.getInDoubtXids();
+
+ pm.setInRecoveryMode(false);
+
+ assertFalse(pm.isInRecoveryMode());
+ }
+
+ // Private --------------------------------------------------------------------
+
+ private Message extractMessage(Map<Long, Queue> queues, long id, byte[] msgBytes, byte[] refBytes) throws Exception
+ {
+ ByteBuffer buffer = ByteBuffer.wrap(msgBytes);
+
+ int type = buffer.getInt();
+
+ long expiration = buffer.getLong();
+
+ long timestamp = buffer.getLong();
+
+ byte priority = buffer.get();
+
+ int headerSize = buffer.getInt();
+
+ byte[] headers = new byte[headerSize];
+
+ buffer.get(headers);
+
+ int payloadSize = buffer.getInt();
+
+ byte[] payload = null;
+
+ if (payloadSize != 0)
+ {
+ payload = new byte[payloadSize];
+
+ buffer.get(payload);
+ }
+
+ Message message = new MessageImpl(id, type, true, expiration, timestamp, priority,
+ headers, payload);
+
+ buffer = ByteBuffer.wrap(refBytes);
+
+ while (buffer.hasRemaining())
+ {
+ long queueID = buffer.getLong();
+
+ int deliveryCount = buffer.getInt();
+
+ long scheduledDeliveryTime = buffer.getLong();
+
+ MessageReference reference = message.createReference(queues.get(queueID));
+
+ reference.setDeliveryCount(deliveryCount);
+
+ reference.setScheduledDeliveryTime(scheduledDeliveryTime);
+ }
+
+ return message;
+ }
+
+ private void assertMessageInStore(Message m, Queue queue) throws Exception
+ {
+ BDBJEDatabase msgDB = bdb.getDatabase(BDBJEPersistenceManager.MESSAGE_DB_NAME);
+
+ BDBJEDatabase refDB = bdb.getDatabase(BDBJEPersistenceManager.REFERENCE_DB_NAME);
+
+ byte[] msgBytes = msgDB.get(m.getMessageID());
+
+ assertNotNull(msgBytes);
+
+ byte[] refBytes = refDB.get(m.getMessageID());
+
+ assertNotNull(refBytes);
+
+ Map<Long, Queue> queues = new HashMap<Long, Queue>();
+
+ queues.put(queue.getID(), queue);
+
+ Message m2 = extractMessage(queues, m.getMessageID(), msgBytes, refBytes);
+
+ assertEquivalent(m, m2);
+ }
+
+ private void assertNumMessagesInStore(int num) throws Exception
+ {
+ BDBJEDatabase msgDB = bdb.getDatabase(BDBJEPersistenceManager.MESSAGE_DB_NAME);
+
+ assertEquals(num, msgDB.size());
+ }
+
+ private void assertMessageNotInStore(Message m) throws Exception
+ {
+ BDBJEDatabase msgDB = bdb.getDatabase(BDBJEPersistenceManager.MESSAGE_DB_NAME);
+
+ BDBJEDatabase refDB = bdb.getDatabase(BDBJEPersistenceManager.REFERENCE_DB_NAME);
+
+
+ byte[] msgBytes = msgDB.get(m.getMessageID());
+
+ assertNull(msgBytes);
+
+ byte[] refBytes = refDB.get(m.getMessageID());
+
+ assertNull(refBytes);
+ }
+
+ private void assertStoreEmpty() throws Exception
+ {
+ BDBJEDatabase msgDB = bdb.getDatabase(BDBJEPersistenceManager.MESSAGE_DB_NAME);
+
+ BDBJEDatabase refDB = bdb.getDatabase(BDBJEPersistenceManager.REFERENCE_DB_NAME);
+
+ assertEquals(0, msgDB.size());
+
+ assertEquals(0, refDB.size());
+ }
+
+ private Message createMessageWithRefs(long id, Queue queue)
+ {
+ Message m = generateMessage(id);
+
+ m.createReference(queue);
+
+ m.createReference(queue);
+
+ m.createReference(queue);
+
+ m.createReference(queue);
+
+ return m;
+ }
+
+
+ // Inner classes ---------------------------------------------------------------
+
+}
More information about the jboss-cvs-commits
mailing list