[rhmessaging-commits] rhmessaging commits: r2040 - in store/branches/java/broker-queue-refactor/java/bdbstore: mvn-repo/sleepycat/berkeleydb-je and 3 other directories.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Tue May 13 06:36:27 EDT 2008


Author: godfrer
Date: 2008-05-13 06:36:26 -0400 (Tue, 13 May 2008)
New Revision: 2040

Added:
   store/branches/java/broker-queue-refactor/java/bdbstore/mvn-repo/sleepycat/berkeleydb-je/3.2.76/
   store/branches/java/broker-queue-refactor/java/bdbstore/mvn-repo/sleepycat/berkeleydb-je/3.2.76/berkeleydb-je-3.2.76.jar
Modified:
   store/branches/java/broker-queue-refactor/java/bdbstore/pom.xml
   store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
   store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java
Log:
Upgrade BDB version, make store work with changes to broker

Added: store/branches/java/broker-queue-refactor/java/bdbstore/mvn-repo/sleepycat/berkeleydb-je/3.2.76/berkeleydb-je-3.2.76.jar
===================================================================
(Binary files differ)


Property changes on: store/branches/java/broker-queue-refactor/java/bdbstore/mvn-repo/sleepycat/berkeleydb-je/3.2.76/berkeleydb-je-3.2.76.jar
___________________________________________________________________
Name: svn:mime-type
   + application/octet-stream

Modified: store/branches/java/broker-queue-refactor/java/bdbstore/pom.xml
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/pom.xml	2008-05-13 09:29:57 UTC (rev 2039)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/pom.xml	2008-05-13 10:36:26 UTC (rev 2040)
@@ -32,11 +32,11 @@
         <version>1.0-incubating-M2.1-SNAPSHOT</version>
     </parent>
 
-    <!-- Local repository for the BerkelyDB-je so we don't have to use the installer script --> 
+    <!-- Local repository for the BerkeleyDB-je so we don't have to use the installer script --> 
     <repositories>
         <repository>
-            <id>berkley-je.local</id>
-            <name>Local BerkelyDB JE Repository</name>
+            <id>berkeley-je.local</id>
+            <name>Local BerkeleyDB JE Repository</name>
             <url>file://${basedir}/mvn-repo</url>
         </repository>
     </repositories>
@@ -61,7 +61,7 @@
         <dependency>
             <groupId>sleepycat</groupId>
             <artifactId>berkeleydb-je</artifactId>
-            <version>3.2.42</version>
+            <version>3.2.76</version>
         </dependency>
         <dependency>
             <groupId>log4j</groupId>

Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java	2008-05-13 09:29:57 UTC (rev 2039)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java	2008-05-13 10:36:26 UTC (rev 2040)
@@ -40,6 +40,7 @@
 import org.apache.qpid.server.queue.MessageHandleFactory;
 import org.apache.qpid.server.queue.MessageMetaData;
 import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.store.MessageStore;
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.txn.NonTransactionalContext;
@@ -57,6 +58,7 @@
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * BDBMessageStore implements a persistent {@link MessageStore} using the BDB high performance log.
@@ -457,7 +459,7 @@
                     .getQueueName() + ", Routing Key: " + binding.getRoutingKey() + ", Arguments: " + binding.getArguments()
                     + ")");
 
-                queue.bind(binding.getRoutingKey(), binding.getArguments(), exchange);
+                queue.bind(exchange, binding.getRoutingKey(), binding.getArguments() );
             }
         }
     }
@@ -636,14 +638,16 @@
     /**
      * Removes the specified queue from the persistent store.
      *
-     * @param name The queue to remove.
-     *
+     * @param queue The queue to remove.
      * @throws AMQException If the operation fails for any reason.
      */
-    public void removeQueue(AMQShortString name) throws AMQException
+    public void removeQueue(final AMQQueue queue) throws AMQException
     {
+        AMQShortString name = queue.getName();
+
         _log.debug("public void removeQueue(AMQShortString name = " + name + "): called");
 
+
         DatabaseEntry key = new DatabaseEntry();
         EntryBinding keyBinding = new AMQShortStringTupleBinding();
         keyBinding.objectToEntry(name, key);
@@ -693,16 +697,17 @@
      * Places a message onto a specified queue, in a given transactional context.
      *
      * @param context   The transactional context for the operation.
-     * @param name      The name of the queue to place the message on.
+     * @param queue     The the queue to place the message on.
      * @param messageId The message to enqueue.
      *
      * @throws AMQException If the operation fails for any reason.
      */
-    public void enqueueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException
+    public void enqueueMessage(StoreContext context, AMQQueue queue, Long messageId) throws AMQException
     {
         // _log.debug("public void enqueueMessage(StoreContext context = " + context + ", AMQShortString name = " + name
         // + ", Long messageId): called");
 
+        AMQShortString name = queue.getName();
         Transaction tx = (Transaction) context.getPayload();
         DatabaseEntry key = new DatabaseEntry();
         EntryBinding keyBinding = new DeliveryDetailsKey.TupleBinding();
@@ -753,13 +758,13 @@
      * Extracts a message from a specified queue, in a given transactional context.
      *
      * @param context   The transactional context for the operation.
-     * @param name      The name of the queue to take the message from.
+     * @param queue     The name queue to take the message from.
      * @param messageId The message to dequeue.
-     *
-     * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
+ * @throws AMQException If the operation fails for any reason, or if the specified message does not exist.
      */
-    public void dequeueMessage(StoreContext context, AMQShortString name, Long messageId) throws AMQException
+    public void dequeueMessage(StoreContext context, final AMQQueue queue, Long messageId) throws AMQException
     {
+        AMQShortString name = queue.getName();
         boolean isLocal = getOrCreateTransaction(context);
         Transaction tx = (Transaction) context.getPayload();
 
@@ -1089,7 +1094,7 @@
             _log.debug("public void storeMessageMetaData(StoreContext context = " + context + ", Long messageId = "
                 + messageId + ", MessageMetaData messageMetaData = " + messageMetaData + "): called");
         }
-
+        getOrCreateTransaction(context);
         Transaction tx = (Transaction) context.getPayload();
         DatabaseEntry key = new DatabaseEntry();
         EntryBinding keyBinding = TupleBinding.getPrimitiveBinding(Long.class);
@@ -1243,7 +1248,7 @@
 
         public void process() throws AMQException
         {
-            _queue.process(_context, _queue.createEntry(_message), false);
+            _queue.enqueue(_context, _message);
         }
 
     }
@@ -1269,7 +1274,7 @@
             MessageHandleFactory messageHandleFactory = new MessageHandleFactory();
             long maxId = 1;
 
-            TransactionalContext txnContext = new NonTransactionalContext(this, new StoreContext(), null, null, null);
+            TransactionalContext txnContext = new NonTransactionalContext(this, new StoreContext(), null, null);
 
             while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
             {
@@ -1281,7 +1286,7 @@
                 AMQQueue queue = queues.get(queueName);
                 if (queue == null)
                 {
-                    queue = new AMQQueue(queueName, false, null, false, _virtualHost);
+                    queue =  AMQQueueFactory.createAMQQueueImpl(queueName, false, null, false, _virtualHost, null);
                     _virtualHost.getQueueRegistry().registerQueue(queue);
                     queues.put(queueName, queue);
                 }
@@ -1439,9 +1444,9 @@
         {
             // _log.debug("public void commit(): called");
 
+            _commitThread.addJob(this);
             synchronized (this)
             {
-                _commitThread.addJob(this);
                 while (!_complete)
                 {
                     try
@@ -1478,8 +1483,10 @@
         // private final Logger _log = Logger.getLogger(CommitThread.class);
 
         private final AtomicBoolean _stopped = new AtomicBoolean(false);
-        private final Queue<Commit> _jobQueue = new ConcurrentLinkedQueue<Commit>();
+        private final AtomicReference<Queue<Commit>> _jobQueue = new AtomicReference<Queue<Commit>>(new ConcurrentLinkedQueue<Commit>());
         private final CheckpointConfig _config = new CheckpointConfig();
+        private final Object _lock = new Object();
+        private AtomicBoolean _hasJobs = new AtomicBoolean(false);
 
         public CommitThread(String name)
         {
@@ -1494,15 +1501,15 @@
             {
                 try
                 {
-                    synchronized (this)
+                    synchronized (_lock)
                     {
                         while (!_stopped.get() && !hasJobs())
                         {
-                            wait();
+                            _lock.wait();
                         }
+                    }
+                    processJobs();
 
-                        processJobs();
-                    }
                 }
                 catch (InterruptedException e)
                 {
@@ -1515,33 +1522,27 @@
         {
             // _log.debug("private void processJobs(): called");
 
-            List<Commit> jobs = new LinkedList<Commit>();
+            // we replace the old queue atomically with a new one and this avoids any need to
+            // copy elements out of the queue
+            Queue<Commit> jobs = _jobQueue.getAndSet(new ConcurrentLinkedQueue<Commit>());
 
-            Commit job;
-
-            while ((job = _jobQueue.poll()) != null)
-            {
-                jobs.add(job);
-            }
-
             try
             {
                 // _environment.checkpoint(_config);
                 _environment.sync();
-
-                // _log.info("Commited " + jobs.size() + " jobs.");
-
-                /*  Iterator<Commit> iter = jobs.iterator();
-                  while(iter.hasNext())
-                  {
-                      Commit commit = iter.next();
-                      commit.prepare(!iter.hasNext());
-                  }
-                 */
+                
                 for (Commit commit : jobs)
                 {
                     commit.complete();
                 }
+                if(_jobQueue.get().isEmpty())
+                {
+                    _hasJobs.set(false);
+                    if(!_jobQueue.get().isEmpty())
+                    {
+                        _hasJobs.set(true);
+                    }
+                }
 
             }
             catch (DatabaseException e)
@@ -1556,19 +1557,28 @@
 
         private boolean hasJobs()
         {
-            return !_jobQueue.isEmpty();
+            return !_jobQueue.get().isEmpty();
         }
 
-        public synchronized void addJob(Commit commit)
+        public void addJob(Commit commit)
         {
-            _jobQueue.add(commit);
-            notify();
+            _jobQueue.get().add(commit);
+            if(_hasJobs.compareAndSet(false, true))
+            {
+                synchronized(_lock)
+                {
+                    _lock.notifyAll();
+                }
+            }
         }
 
-        public synchronized void close()
+        public void close()
         {
             _stopped.set(true);
-            notify();
+            synchronized(_lock)
+            {
+                _lock.notifyAll();
+            }
         }
     }
 }

Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java	2008-05-13 09:29:57 UTC (rev 2039)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java	2008-05-13 10:36:26 UTC (rev 2040)
@@ -32,7 +32,7 @@
 import org.apache.qpid.server.store.StoreContext;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.MessageMetaData;
-import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.queue.AMQQueueFactory;
 import org.apache.qpid.server.registry.ApplicationRegistry;
 import org.apache.qpid.server.txn.NonTransactionalContext;
 import org.apache.qpid.server.txn.TransactionalContext;
@@ -42,7 +42,6 @@
 import java.io.File;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.HashSet;
 
 public class BDBStoreTest extends TestCase
 {
@@ -84,8 +83,7 @@
         _store.setVirtualHost(_virtualHost);
         _store.startCommitThread();
 
-        _txnContext = new NonTransactionalContext(_store, _storeContext, null, new LinkedList<RequiredDeliveryException>(),
-                                                  new HashSet<Long>());
+        _txnContext = new NonTransactionalContext(_store, _storeContext, null, new LinkedList<RequiredDeliveryException>());
     }
 
     public void tearDown() throws Exception
@@ -95,7 +93,7 @@
 
     public void testQueuePersistence() throws DatabaseException, AMQException
     {
-        AMQQueue queue = new AMQQueue(QUEUE1, true, ME, false, _virtualHost);
+        AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
         _store.createQueue(queue);
         AMQQueue returnedQueue = _store.getQueue(QUEUE1);
 
@@ -262,12 +260,12 @@
         _store.storeMessageMetaData(_storeContext, 22L, new MessageMetaData(pubBody, chb, 0));
 
 
-        AMQQueue queue = new AMQQueue(QUEUE1, true, ME, false, _virtualHost);
+        AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
         _store.createQueue(queue);
 
         _store.beginTran(_storeContext);
-        _store.enqueueMessage(_storeContext, QUEUE1, 20L);
-        _store.enqueueMessage(_storeContext, QUEUE1, 21L);
+        _store.enqueueMessage(_storeContext, queue, 20L);
+        _store.enqueueMessage(_storeContext, queue, 21L);
         _store.commitTran(_storeContext);
 
         List<Long> enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
@@ -291,17 +289,17 @@
         _store.storeMessageMetaData(_storeContext, 31L, new MessageMetaData(pubBody, chb, 0));
         _store.storeMessageMetaData(_storeContext, 32L, new MessageMetaData(pubBody, chb, 0));
 
-        AMQQueue queue = new AMQQueue(QUEUE1, true, ME, false, _virtualHost);
+        AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
         _store.createQueue(queue);
 
 
         _store.beginTran(_storeContext);
-        _store.enqueueMessage(_storeContext, QUEUE1, 30L);
-        _store.enqueueMessage(_storeContext, QUEUE1, 31L);
+        _store.enqueueMessage(_storeContext, queue, 30L);
+        _store.enqueueMessage(_storeContext, queue, 31L);
         _store.commitTran(_storeContext);
 
         _store.beginTran(_storeContext);
-        _store.enqueueMessage(_storeContext, QUEUE1, 32L);
+        _store.enqueueMessage(_storeContext, queue, 32L);
         _store.abortTran(_storeContext);
 
         _store.beginTran(_storeContext);
@@ -329,16 +327,16 @@
         _store.storeMessageMetaData(_storeContext, 32L, new MessageMetaData(pubBody, chb, 0));
 
 
-        AMQQueue queue = new AMQQueue(QUEUE1, true, ME, false, _virtualHost);
+        AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
         _store.createQueue(queue);
 
         _store.beginTran(_storeContext);
-        _store.enqueueMessage(_storeContext, QUEUE1, 30L);
+        _store.enqueueMessage(_storeContext, queue, 30L);
         _store.abortTran(_storeContext);
 
         _store.beginTran(_storeContext);
-        _store.enqueueMessage(_storeContext, QUEUE1, 31L);
-        _store.enqueueMessage(_storeContext, QUEUE1, 32L);
+        _store.enqueueMessage(_storeContext, queue, 31L);
+        _store.enqueueMessage(_storeContext, queue, 32L);
         _store.commitTran(_storeContext);
 
         List<Long> enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
@@ -363,19 +361,19 @@
         _store.storeMessageMetaData(_storeContext, 42L, new MessageMetaData(pubBody, chb, 0));
 
 
-        AMQQueue queue = new AMQQueue(QUEUE1, true, ME, false, _virtualHost);
-        AMQQueue queue2 = new AMQQueue(QUEUE2, true, HIM, false, _virtualHost);
+        AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
+        AMQQueue queue2 = AMQQueueFactory.createAMQQueueImpl(QUEUE2, true, HIM, false, _virtualHost, null);
 
         _store.createQueue(queue);
         _store.createQueue(queue2);
 
         _store.beginTran(_storeContext);
-        _store.enqueueMessage(_storeContext, QUEUE1, 40L);
-        _store.enqueueMessage(_storeContext, QUEUE1, 41L);
-        _store.enqueueMessage(_storeContext, QUEUE2, 42L);
+        _store.enqueueMessage(_storeContext, queue, 40L);
+        _store.enqueueMessage(_storeContext, queue, 41L);
+        _store.enqueueMessage(_storeContext, queue2, 42L);
         _store.commitTran(_storeContext);
 
-        _store.enqueueMessage(_storeContext, QUEUE1, 42L);
+        _store.enqueueMessage(_storeContext, queue, 42L);
 
         _virtualHost.getQueueRegistry().unregisterQueue(queue.getName());
         _virtualHost.getQueueRegistry().unregisterQueue(queue2.getName());
@@ -421,21 +419,21 @@
 
         _store.storeMessageMetaData(_storeContext, 50L, new MessageMetaData(pubBody, chb, 0));
 
-        AMQQueue queue = new AMQQueue(QUEUE1, true, ME, false, _virtualHost);
+        AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
         _store.createQueue(queue);
 
-        _store.enqueueMessage(_storeContext, QUEUE1, 50L);
-        _store.dequeueMessage(_storeContext, QUEUE1, 50L);
+        _store.enqueueMessage(_storeContext, queue, 50L);
+        _store.dequeueMessage(_storeContext, queue, 50L);
     }
 
     public void testQueueRemove() throws AMQException
     {
-        AMQQueue queue = new AMQQueue(QUEUE1, true, ME, false, _virtualHost);
+        AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
         _store.createQueue(queue);
-        _store.removeQueue(QUEUE1);
+        _store.removeQueue(queue);
         try
         {
-            _store.removeQueue(QUEUE1);
+            _store.removeQueue(queue);
             Assert.fail("No exception thrown when deleting non-existant queue");
         }
         catch (AMQException e)




More information about the rhmessaging-commits mailing list