[rhmessaging-commits] rhmessaging commits: r2040 - in store/branches/java/broker-queue-refactor/java/bdbstore: mvn-repo/sleepycat/berkeleydb-je and 3 other directories.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Tue May 13 06:36:27 EDT 2008
Author: godfrer
Date: 2008-05-13 06:36:26 -0400 (Tue, 13 May 2008)
New Revision: 2040
Added:
store/branches/java/broker-queue-refactor/java/bdbstore/mvn-repo/sleepycat/berkeleydb-je/3.2.76/
store/branches/java/broker-queue-refactor/java/bdbstore/mvn-repo/sleepycat/berkeleydb-je/3.2.76/berkeleydb-je-3.2.76.jar
Modified:
store/branches/java/broker-queue-refactor/java/bdbstore/pom.xml
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java
Log:
Upgrade BDB version, make store work with changes to broker
Added: store/branches/java/broker-queue-refactor/java/bdbstore/mvn-repo/sleepycat/berkeleydb-je/3.2.76/berkeleydb-je-3.2.76.jar
===================================================================
(Binary files differ)
Property changes on: store/branches/java/broker-queue-refactor/java/bdbstore/mvn-repo/sleepycat/berkeleydb-je/3.2.76/berkeleydb-je-3.2.76.jar
___________________________________________________________________
Name: svn:mime-type
+ application/octet-stream
Modified: store/branches/java/broker-queue-refactor/java/bdbstore/pom.xml
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/pom.xml 2008-05-13 09:29:57 UTC (rev 2039)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/pom.xml 2008-05-13 10:36:26 UTC (rev 2040)
@@ -32,11 +32,11 @@
<version>1.0-incubating-M2.1-SNAPSHOT</version>
</parent>
- <!-- Local repository for the BerkelyDB-je so we don't have to use the installer script -->
+ <!-- Local repository for the BerkeleyDB-je so we don't have to use the installer script -->
<repositories>
<repository>
- <id>berkley-je.local</id>
- <name>Local BerkelyDB JE Repository</name>
+ <id>berkeley-je.local</id>
+ <name>Local BerkeleyDB JE Repository</name>
<url>file://${basedir}/mvn-repo</url>
</repository>
</repositories>
@@ -61,7 +61,7 @@
<dependency>
<groupId>sleepycat</groupId>
<artifactId>berkeleydb-je</artifactId>
- <version>3.2.42</version>
+ <version>3.2.76</version>
</dependency>
<dependency>
<groupId>log4j</groupId>
Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2008-05-13 09:29:57 UTC (rev 2039)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2008-05-13 10:36:26 UTC (rev 2040)
@@ -40,6 +40,7 @@
import org.apache.qpid.server.queue.MessageHandleFactory;
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.txn.NonTransactionalContext;
@@ -57,6 +58,7 @@
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
/**
* BDBMessageStore implements a persistent {@link MessageStore} using the BDB high performance log.
@@ -457,7 +459,7 @@
.getQueueName() + ", Routing Key: " + binding.getRoutingKey() + ", Arguments: " + binding.getArguments()
+ ")");
- queue.bind(binding.getRoutingKey(), binding.getArguments(), exchange);
+ queue.bind(exchange, binding.getRoutingKey(), binding.getArguments() );
}
}
}
@@ -636,14 +638,16 @@
/**
* Removes the specified queue from the persistent store.
*
- * @param name The queue to remove.
- *
+ * @param queue The queue to remove.
* @throws AMQException If the operation fails for any reason.
*/
- public void removeQueue(AMQShortString name) throws AMQException
+ public void removeQueue(final AMQQueue queue) throws AMQException
{
+ AMQShortString name = queue.getName();
+
_log.debug("public void removeQueue(AMQShortString name = " + name + "): called");
+
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = new AMQShortStringTupleBinding();
keyBinding.objectToEntry(name, key);
@@ -693,16 +697,17 @@
* Places a message onto a specified queue, in a given transactional context.
*
* @param context The transactional context for the operation.
- * @param name The name of the queue to place the message on.
+ * @param queue The the queue to place the message on.
* @param messageId The message to enqueue.
*
* @throws AMQException If the operation fails for any reason.
*/
- public void enqueueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException
+ public void enqueueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
{
// _log.debug("public void enqueueMessage(StoreContext context = " + context + ", AMQShortString name = " + name
// + ", Long messageId): called");
+ AMQShortString name = queue.getName();
Transaction tx = (Transaction) context.getPayload();
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = new DeliveryDetailsKey.TupleBinding();
@@ -753,13 +758,13 @@
* Extracts a message from a specified queue, in a given transactional context.
*
* @param context The transactional context for the operation.
- * @param name The name of the queue to take the message from.
+ * @param queue The name queue to take the message from.
* @param messageId The message to dequeue.
- *
- * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
+ * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
*/
- public void dequeueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException
+ public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
{
+ AMQShortString name = queue.getName();
boolean isLocal = getOrCreateTransaction(context);
Transaction tx = (Transaction) context.getPayload();
@@ -1089,7 +1094,7 @@
_log.debug("public void storeMessageMetaData(StoreContext context = " + context + ", Long messageId = "
+ messageId + ", MessageMetaData messageMetaData = " + messageMetaData + "): called");
}
-
+ getOrCreateTransaction(context);
Transaction tx = (Transaction) context.getPayload();
DatabaseEntry key = new DatabaseEntry();
EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);
@@ -1243,7 +1248,7 @@
public void process() throws AMQException
{
- _queue.process(_context, _queue.createEntry(_message), false);
+ _queue.enqueue(_context, _message);
}
}
@@ -1269,7 +1274,7 @@
MessageHandleFactory messageHandleFactory = new MessageHandleFactory();
long maxId = 1;
- TransactionalContext txnContext = new NonTransactionalContext(this, new StoreContext(), null, null, null);
+ TransactionalContext txnContext = new NonTransactionalContext(this, new StoreContext(), null, null);
while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
{
@@ -1281,7 +1286,7 @@
AMQQueue queue = queues.get(queueName);
if (queue == null)
{
- queue = new AMQQueue(queueName, false, null, false, _virtualHost);
+ queue = AMQQueueFactory.createAMQQueueImpl(queueName, false, null, false, _virtualHost, null);
_virtualHost.getQueueRegistry().registerQueue(queue);
queues.put(queueName, queue);
}
@@ -1439,9 +1444,9 @@
{
// _log.debug("public void commit(): called");
+ _commitThread.addJob(this);
synchronized (this)
{
- _commitThread.addJob(this);
while (!_complete)
{
try
@@ -1478,8 +1483,10 @@
// private final Logger _log = Logger.getLogger(CommitThread.class);
private final AtomicBoolean _stopped = new AtomicBoolean(false);
- private final Queue<Commit> _jobQueue = new ConcurrentLinkedQueue<Commit>();
+ private final AtomicReference<Queue<Commit>> _jobQueue = new AtomicReference<Queue<Commit>>(new ConcurrentLinkedQueue<Commit>());
private final CheckpointConfig _config = new CheckpointConfig();
+ private final Object _lock = new Object();
+ private AtomicBoolean _hasJobs = new AtomicBoolean(false);
public CommitThread(String name)
{
@@ -1494,15 +1501,15 @@
{
try
{
- synchronized (this)
+ synchronized (_lock)
{
while (!_stopped.get() && !hasJobs())
{
- wait();
+ _lock.wait();
}
+ }
+ processJobs();
- processJobs();
- }
}
catch (InterruptedException e)
{
@@ -1515,33 +1522,27 @@
{
// _log.debug("private void processJobs(): called");
- List<Commit> jobs = new LinkedList<Commit>();
+ // we replace the old queue atomically with a new one and this avoids any need to
+ // copy elements out of the queue
+ Queue<Commit> jobs = _jobQueue.getAndSet(new ConcurrentLinkedQueue<Commit>());
- Commit job;
-
- while ((job = _jobQueue.poll()) != null)
- {
- jobs.add(job);
- }
-
try
{
// _environment.checkpoint(_config);
_environment.sync();
-
- // _log.info("Commited " + jobs.size() + " jobs.");
-
- /* Iterator<Commit> iter = jobs.iterator();
- while(iter.hasNext())
- {
- Commit commit = iter.next();
- commit.prepare(!iter.hasNext());
- }
- */
+
for (Commit commit : jobs)
{
commit.complete();
}
+ if(_jobQueue.get().isEmpty())
+ {
+ _hasJobs.set(false);
+ if(!_jobQueue.get().isEmpty())
+ {
+ _hasJobs.set(true);
+ }
+ }
}
catch (DatabaseException e)
@@ -1556,19 +1557,28 @@
private boolean hasJobs()
{
- return !_jobQueue.isEmpty();
+ return !_jobQueue.get().isEmpty();
}
- public synchronized void addJob(Commit commit)
+ public void addJob(Commit commit)
{
- _jobQueue.add(commit);
- notify();
+ _jobQueue.get().add(commit);
+ if(_hasJobs.compareAndSet(false, true))
+ {
+ synchronized(_lock)
+ {
+ _lock.notifyAll();
+ }
+ }
}
- public synchronized void close()
+ public void close()
{
_stopped.set(true);
- notify();
+ synchronized(_lock)
+ {
+ _lock.notifyAll();
+ }
}
}
}
Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java 2008-05-13 09:29:57 UTC (rev 2039)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java 2008-05-13 10:36:26 UTC (rev 2040)
@@ -32,7 +32,7 @@
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageMetaData;
-import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
@@ -42,7 +42,6 @@
import java.io.File;
import java.util.LinkedList;
import java.util.List;
-import java.util.HashSet;
public class BDBStoreTest extends TestCase
{
@@ -84,8 +83,7 @@
_store.setVirtualHost(_virtualHost);
_store.startCommitThread();
- _txnContext = new NonTransactionalContext(_store, _storeContext, null, new LinkedList<RequiredDeliveryException>(),
- new HashSet<Long>());
+ _txnContext = new NonTransactionalContext(_store, _storeContext, null, new LinkedList<RequiredDeliveryException>());
}
public void tearDown() throws Exception
@@ -95,7 +93,7 @@
public void testQueuePersistence() throws DatabaseException, AMQException
{
- AMQQueue queue = new AMQQueue(QUEUE1, true, ME, false, _virtualHost);
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
_store.createQueue(queue);
AMQQueue returnedQueue = _store.getQueue(QUEUE1);
@@ -262,12 +260,12 @@
_store.storeMessageMetaData(_storeContext, 22L, new MessageMetaData(pubBody, chb, 0));
- AMQQueue queue = new AMQQueue(QUEUE1, true, ME, false, _virtualHost);
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
_store.createQueue(queue);
_store.beginTran(_storeContext);
- _store.enqueueMessage(_storeContext, QUEUE1, 20L);
- _store.enqueueMessage(_storeContext, QUEUE1, 21L);
+ _store.enqueueMessage(_storeContext, queue, 20L);
+ _store.enqueueMessage(_storeContext, queue, 21L);
_store.commitTran(_storeContext);
List<Long> enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
@@ -291,17 +289,17 @@
_store.storeMessageMetaData(_storeContext, 31L, new MessageMetaData(pubBody, chb, 0));
_store.storeMessageMetaData(_storeContext, 32L, new MessageMetaData(pubBody, chb, 0));
- AMQQueue queue = new AMQQueue(QUEUE1, true, ME, false, _virtualHost);
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
_store.createQueue(queue);
_store.beginTran(_storeContext);
- _store.enqueueMessage(_storeContext, QUEUE1, 30L);
- _store.enqueueMessage(_storeContext, QUEUE1, 31L);
+ _store.enqueueMessage(_storeContext, queue, 30L);
+ _store.enqueueMessage(_storeContext, queue, 31L);
_store.commitTran(_storeContext);
_store.beginTran(_storeContext);
- _store.enqueueMessage(_storeContext, QUEUE1, 32L);
+ _store.enqueueMessage(_storeContext, queue, 32L);
_store.abortTran(_storeContext);
_store.beginTran(_storeContext);
@@ -329,16 +327,16 @@
_store.storeMessageMetaData(_storeContext, 32L, new MessageMetaData(pubBody, chb, 0));
- AMQQueue queue = new AMQQueue(QUEUE1, true, ME, false, _virtualHost);
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
_store.createQueue(queue);
_store.beginTran(_storeContext);
- _store.enqueueMessage(_storeContext, QUEUE1, 30L);
+ _store.enqueueMessage(_storeContext, queue, 30L);
_store.abortTran(_storeContext);
_store.beginTran(_storeContext);
- _store.enqueueMessage(_storeContext, QUEUE1, 31L);
- _store.enqueueMessage(_storeContext, QUEUE1, 32L);
+ _store.enqueueMessage(_storeContext, queue, 31L);
+ _store.enqueueMessage(_storeContext, queue, 32L);
_store.commitTran(_storeContext);
List<Long> enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
@@ -363,19 +361,19 @@
_store.storeMessageMetaData(_storeContext, 42L, new MessageMetaData(pubBody, chb, 0));
- AMQQueue queue = new AMQQueue(QUEUE1, true, ME, false, _virtualHost);
- AMQQueue queue2 = new AMQQueue(QUEUE2, true, HIM, false, _virtualHost);
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
+ AMQQueue queue2 = AMQQueueFactory.createAMQQueueImpl(QUEUE2, true, HIM, false, _virtualHost, null);
_store.createQueue(queue);
_store.createQueue(queue2);
_store.beginTran(_storeContext);
- _store.enqueueMessage(_storeContext, QUEUE1, 40L);
- _store.enqueueMessage(_storeContext, QUEUE1, 41L);
- _store.enqueueMessage(_storeContext, QUEUE2, 42L);
+ _store.enqueueMessage(_storeContext, queue, 40L);
+ _store.enqueueMessage(_storeContext, queue, 41L);
+ _store.enqueueMessage(_storeContext, queue2, 42L);
_store.commitTran(_storeContext);
- _store.enqueueMessage(_storeContext, QUEUE1, 42L);
+ _store.enqueueMessage(_storeContext, queue, 42L);
_virtualHost.getQueueRegistry().unregisterQueue(queue.getName());
_virtualHost.getQueueRegistry().unregisterQueue(queue2.getName());
@@ -421,21 +419,21 @@
_store.storeMessageMetaData(_storeContext, 50L, new MessageMetaData(pubBody, chb, 0));
- AMQQueue queue = new AMQQueue(QUEUE1, true, ME, false, _virtualHost);
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
_store.createQueue(queue);
- _store.enqueueMessage(_storeContext, QUEUE1, 50L);
- _store.dequeueMessage(_storeContext, QUEUE1, 50L);
+ _store.enqueueMessage(_storeContext, queue, 50L);
+ _store.dequeueMessage(_storeContext, queue, 50L);
}
public void testQueueRemove() throws AMQException
{
- AMQQueue queue = new AMQQueue(QUEUE1, true, ME, false, _virtualHost);
+ AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
_store.createQueue(queue);
- _store.removeQueue(QUEUE1);
+ _store.removeQueue(queue);
try
{
- _store.removeQueue(QUEUE1);
+ _store.removeQueue(queue);
Assert.fail("No exception thrown when deleting non-existant queue");
}
catch (AMQException e)
More information about the rhmessaging-commits
mailing list