Author: ritchiem
Date: 2009-02-13 06:41:24 -0500 (Fri, 13 Feb 2009)
New Revision: 3117
Modified:
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTB.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java
Log:
Updates based on QPID-1628, QPID-1629 that remove the MessageHandles and the factory
Modified:
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
===================================================================
---
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2009-02-11
13:45:15 UTC (rev 3116)
+++
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2009-02-13
11:41:24 UTC (rev 3117)
@@ -23,13 +23,20 @@
import com.sleepycat.bind.EntryBinding;
import com.sleepycat.bind.tuple.ByteBinding;
import com.sleepycat.bind.tuple.TupleBinding;
-
-import com.sleepycat.je.*;
-
+import com.sleepycat.je.CheckpointConfig;
+import com.sleepycat.je.Cursor;
+import com.sleepycat.je.Database;
+import com.sleepycat.je.DatabaseConfig;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Environment;
+import com.sleepycat.je.EnvironmentConfig;
+import com.sleepycat.je.LockMode;
+import com.sleepycat.je.OperationStatus;
+import com.sleepycat.je.Transaction;
+import com.sleepycat.je.TransactionConfig;
import org.apache.commons.configuration.Configuration;
-
import org.apache.log4j.Logger;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
@@ -37,15 +44,17 @@
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.MessageHandleFactory;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.MessageFactory;
import org.apache.qpid.server.queue.MessageMetaData;
+import org.apache.qpid.server.queue.PersistentAMQMessage;
import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.TransientAMQMessage;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.store.berkeleydb.tuples.QueueTupleBindingFactory;
import org.apache.qpid.server.store.berkeleydb.tuples.BindingTupleBindingFactory;
import org.apache.qpid.server.store.berkeleydb.tuples.QueueTuple;
+import org.apache.qpid.server.store.berkeleydb.tuples.QueueTupleBindingFactory;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -58,8 +67,8 @@
import java.util.Map;
import java.util.Queue;
import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -126,8 +135,11 @@
private QueueTupleBindingFactory _queueTupleBindingFactory;
private BindingTupleBindingFactory _bindingTupleBindingFactory;
+ MessageFactory _messageFactory;
+
/** The data version this store should run with */
private int _version;
+
private enum State
{
INITIAL,
@@ -220,6 +232,8 @@
boolean newEnvironment = setupStore(environmentPath, readonly);
+ _messageFactory = MessageFactory.getInstance();
+
// Performing recovery when we only want read access will cause all the broker
objects to be recreated
// This will/may include thread pool creations that may be duplicated when
manually inspecting the state of
// the store. Simplest solution is to prevent the initial creation of the state
by blocking recovery.
@@ -340,10 +354,10 @@
envConfig.setTransactional(true);
envConfig.setConfigParam("je.lock.nLockTables", "7");
envConfig.setLockTimeout(15000);
-
+
// Set transaction mode
_transactionConfig.setReadCommitted(true);
-
+
//This prevents background threads running which will potentially update the
store.
envConfig.setReadOnly(readonly);
try
@@ -357,7 +371,7 @@
{
//Allow the creation this time
envConfig.setAllowCreate(true);
- if (_environment != null )
+ if (_environment != null)
{
_environment.cleanLog();
_environment.close();
@@ -648,7 +662,7 @@
{
_log.info("Restoring binding: (Exchange: " +
binding.getExchangeName() + ", Queue: " + binding
.getQueueName() + ", Routing Key: " +
binding.getRoutingKey() + ", Arguments: " + binding.getArguments()
- + ")");
+ + ")");
queue.bind(exchange, binding.getRoutingKey(), binding.getArguments());
}
@@ -992,7 +1006,7 @@
{
_log.debug("Message Id: " + messageId + " Dequeue");
}
-
+
try
{
@@ -1000,7 +1014,7 @@
if (status == OperationStatus.NOTFOUND)
{
throw new AMQException("Unable to find message with id " +
messageId + " on queue " + name);
- }
+ }
else if (status != OperationStatus.SUCCESS)
{
throw new AMQException("Unable to remove message with id " +
messageId + " on queue " + name);
@@ -1065,7 +1079,7 @@
{
try
{
- context.setPayload(_environment.beginTransaction(null, _transactionConfig
));
+ context.setPayload(_environment.beginTransaction(null,
_transactionConfig));
}
catch (DatabaseException e)
{
@@ -1437,6 +1451,16 @@
return true;
}
+ public void flow(Long messageId)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void recover(Long messageId)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
Map<AMQShortString, AMQQueue> loadQueues() throws DatabaseException,
AMQException
{
Cursor cursor = null;
@@ -1598,7 +1622,7 @@
private void deliverMessages(final StoreContext context, Map<AMQShortString,
AMQQueue> queues)
throws DatabaseException, AMQException
{
- Map<Long, AMQMessage> msgMap = new HashMap<Long, AMQMessage>();
+ Map<Long, PersistentAMQMessage> msgMap = new HashMap<Long,
PersistentAMQMessage>();
List<ProcessAction> actions = new ArrayList<ProcessAction>();
Map<AMQShortString, Integer> queueRecoveries = new
TreeMap<AMQShortString, Integer>();
@@ -1613,7 +1637,6 @@
DatabaseEntry value = new DatabaseEntry();
EntryBinding valueBinding = TupleBinding.getPrimitiveBinding(Long.class);
- MessageHandleFactory messageHandleFactory = new MessageHandleFactory();
long maxId = 1;
TransactionalContext txnContext = new NonTransactionalContext(this, new
StoreContext(), null, null);
@@ -1635,7 +1658,7 @@
long messageId = dd.messageId;
maxId = Math.max(maxId, messageId);
- AMQMessage message = msgMap.get(messageId);
+ PersistentAMQMessage message = msgMap.get(messageId);
if (message != null)
{
@@ -1643,8 +1666,23 @@
}
else
{
- message = new AMQMessage(messageId, this, messageHandleFactory,
txnContext);
+
+ message = (PersistentAMQMessage)
_messageFactory.createMessage(messageId, this, true);
msgMap.put(messageId, message);
+
+ MessageMetaData mmd = getMessageMetaData(context, messageId);
+ //Recover HeaderBody
+ message.recoverFromMessageMetaData(mmd);
+
+ //Recover Bodies
+ int count = mmd.getContentChunkCount();
+
+ for (int index = 0; index < count; index++)
+ {
+ ContentChunk cc = getContentBodyChunk(context, messageId,
index);
+ message.recoverContentBodyFrame(cc, index == count);
+ }
+
}
if (_log.isDebugEnabled())
Modified:
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java
===================================================================
---
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java 2009-02-11
13:45:15 UTC (rev 3116)
+++
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ContentTB.java 2009-02-13
11:41:24 UTC (rev 3117)
@@ -23,14 +23,10 @@
import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
-import org.apache.qpid.framing.ContentBody;
+import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.mina.common.ByteBuffer;
-
-/**
- * @author Robert Greig (robert.j.greig(a)jpmorgan.com)
- */
+/** @author Robert Greig (robert.j.greig(a)jpmorgan.com) */
public class ContentTB extends TupleBinding
{
public Object entryToObject(TupleInput tupleInput)
@@ -39,7 +35,7 @@
final int size = tupleInput.readInt();
byte[] underlying = new byte[size];
tupleInput.readFast(underlying);
- final ByteBuffer data = ByteBuffer.wrap(underlying);
+ final ByteBuffer data = ByteBuffer.wrap(underlying);
ContentChunk cb = new ContentChunk()
{
@@ -70,7 +66,7 @@
ByteBuffer buf = cb.getData();
buf.duplicate().rewind().get(underlying);
-
+
tupleOutput.writeInt(size);
tupleOutput.writeFast(underlying);
}
Modified:
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTB.java
===================================================================
---
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTB.java 2009-02-11
13:45:15 UTC (rev 3116)
+++
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/MessageMetaDataTB.java 2009-02-13
11:41:24 UTC (rev 3117)
@@ -26,6 +26,7 @@
import org.apache.qpid.framing.*;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.queue.MessageMetaData;
+import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
/**
* Handles the mapping to and from message meta data
@@ -76,35 +77,7 @@
final boolean mandatory = tupleInput.readBoolean();
final boolean immediate = tupleInput.readBoolean();
- return new MessagePublishInfo()
- {
-
- public AMQShortString getExchange()
- {
- return exchange;
- }
-
- public void setExchange(AMQShortString exchange)
- {
-
- }
-
- public boolean isImmediate()
- {
- return immediate;
- }
-
- public boolean isMandatory()
- {
- return mandatory;
- }
-
- public AMQShortString getRoutingKey()
- {
- return routingKey;
- }
- } ;
-
+ return new MessagePublishInfoImpl(exchange,immediate,mandatory,routingKey);
}
Modified:
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java
===================================================================
---
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java 2009-02-11
13:45:15 UTC (rev 3116)
+++
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java 2009-02-13
11:41:24 UTC (rev 3117)
@@ -19,7 +19,6 @@
import com.sleepycat.je.DatabaseException;
import junit.framework.Assert;
-import junit.framework.TestCase;
import junit.framework.TestSuite;
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
@@ -28,6 +27,7 @@
import org.apache.qpid.framing.*;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.DefaultExchangeFactory;
@@ -39,6 +39,7 @@
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.AMQPriorityQueue;
+import org.apache.qpid.server.queue.MockContentChunk;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
@@ -55,7 +56,7 @@
private BDBMessageStore _store;
private String STORE_LOCATION = System.getProperty("BDB_WORK") +
"/bdbTestEnv";
-
+
private StoreContext _storeContext = new StoreContext();
private VirtualHost _virtualHost;
@@ -176,36 +177,7 @@
private MessagePublishInfo createPublishBody()
{
-
- return new MessagePublishInfo()
- {
-
- public AMQShortString getExchange()
- {
- return MYEXCHANGE;
- }
-
- public void setExchange(AMQShortString exchange)
- {
-
- }
-
- public boolean isImmediate()
- {
- return false;
- }
-
- public boolean isMandatory()
- {
- return true;
- }
-
- public AMQShortString getRoutingKey()
- {
- return RK;
- }
- };
-
+ return new MessagePublishInfoImpl(MYEXCHANGE,false,true,RK);
}
private BasicContentHeaderProperties createContentHeaderProperties()
@@ -440,8 +412,10 @@
_store.storeMessageMetaData(_storeContext, 40L, new MessageMetaData(pubBody, chb,
0));
_store.storeMessageMetaData(_storeContext, 41L, new MessageMetaData(pubBody, chb,
0));
- _store.storeMessageMetaData(_storeContext, 42L, new MessageMetaData(pubBody, chb,
0));
+ _store.storeMessageMetaData(_storeContext, 42L, new MessageMetaData(pubBody, chb,
1));
+ _store.storeContentBodyChunk(_storeContext, 42L, 0, new MockContentChunk(),
true);
+
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false,
_virtualHost, null);
AMQQueue queue2 = AMQQueueFactory.createAMQQueueImpl(QUEUE2, true, HIM, false,
_virtualHost, null);