[rhmessaging-commits] rhmessaging commits: r3155 - in store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb: tuples and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Fri Mar 13 08:22:07 EDT 2009


Author: ritchiem
Date: 2009-03-13 08:22:07 -0400 (Fri, 13 Mar 2009)
New Revision: 3155

Modified:
   store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
   store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java
   store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java
Log:
QPID-1730 : Update to make the recovery of queues and exchanges perform a look up in the broker to see if the object already exists rather than make a new one. This does mean that any dynamic changes will be lost, but I'm not sure they are persisted anyway.

Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java	2009-03-12 15:04:25 UTC (rev 3154)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java	2009-03-13 12:22:07 UTC (rev 3155)
@@ -41,6 +41,7 @@
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 import org.apache.qpid.server.exchange.Exchange;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.AMQQueueFactory;
@@ -57,7 +58,6 @@
 import org.apache.qpid.server.txn.NonTransactionalContext;
 import org.apache.qpid.server.txn.TransactionalContext;
 import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
 
 import java.io.File;
 import java.util.ArrayList;
@@ -480,7 +480,7 @@
     /**
      * Removes the specified message from the store in the given transactional store context.
      * Internal method that is package scoped to allow testing.
-     *  
+     *
      * @param context   The transactional context to remove the message in.
      * @param messageId Identifies the message to remove.
      *
@@ -649,7 +649,7 @@
 
     private void recoverExchange(Exchange exchange) throws AMQException, DatabaseException
     {
-        _log.info("Recovering durable exchange " + exchange.getName() + " of type " + exchange.getType() + "...");
+        _log.info("Recovering bindings for durable exchange:" + exchange);
 
         QueueRegistry queueRegistry = _virtualHost.getQueueRegistry();
 
@@ -732,10 +732,7 @@
             {
                 Exchange exchange = (Exchange) binding.entryToObject(value);
 
-                _virtualHost.getExchangeRegistry().registerExchange(exchange);
-
                 exchanges.add(exchange);
-                _log.info("Registering exchange " + exchange.getName());
             }
 
             return exchanges;
@@ -834,10 +831,18 @@
      */
     public void createQueue(AMQQueue queue, FieldTable arguments) throws AMQException
     {
-        _log.debug("public void createQueue(AMQQueue queue(" + queue.getName() + ") = " + queue + "): called");
 
         if (_state != State.RECOVERING)
         {
+
+            // Check that we don't already have this queue in our map.
+            if (_queueNameToIdMap.containsKey(queue.getName()))
+            {
+                return;
+            }
+
+            _log.debug("BDBMessageStore.createQueue(AMQQueue queue(" + queue + "," + arguments + "): called");
+
             long queueId = _queueId.getAndIncrement();
             _queueNameToIdMap.put(queue.getName(), queueId);
 
@@ -1057,7 +1062,7 @@
             {
                 commit(tx);
                 context.setPayload(null);
-            }                                            
+            }
         }
         catch (DatabaseException e)
         {
@@ -1494,13 +1499,8 @@
             while (cursor.getNext(key, value, LockMode.RMW) == OperationStatus.SUCCESS)
             {
                 AMQQueue queue = (AMQQueue) binding.entryToObject(value);
-                if (queue != null)
-                {
-                    _virtualHost.getQueueRegistry().registerQueue(queue);
-                    queues.put(queue.getName(), queue);
-                    _log.info("Recovering queue " + queue.getName() + " with owner:"
-                              + ((queue.getOwner() == null) ? " <<null>> " : (" \"" + queue.getOwner() + "\" ")));
-                }
+
+                queues.put(queue.getName(), queue);
             }
 
             return queues;
@@ -1619,7 +1619,6 @@
         }
     }
 
-
     private void deliverMessages(final StoreContext context, Map<AMQShortString, AMQQueue> queues)
             throws DatabaseException, AMQException
     {

Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java	2009-03-12 15:04:25 UTC (rev 3154)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java	2009-03-13 12:22:07 UTC (rev 3155)
@@ -1,21 +1,18 @@
 package org.apache.qpid.server.store.berkeleydb;
 
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.AMQException;
-
 import com.sleepycat.bind.tuple.TupleBinding;
 import com.sleepycat.bind.tuple.TupleInput;
 import com.sleepycat.bind.tuple.TupleOutput;
 import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.virtualhost.VirtualHost;
 
 public class ExchangeTB extends TupleBinding
 {
     private static final Logger _log = Logger.getLogger(ExchangeTB.class);
 
-
-
     private final VirtualHost _virtualHost;
 
     public ExchangeTB(VirtualHost virtualHost)
@@ -29,12 +26,28 @@
         AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
         AMQShortString typeName = AMQShortStringEncoding.readShortString(tupleInput);
 
-
         boolean autoDelete = tupleInput.readBoolean();
 
         try
         {
-            return _virtualHost.getExchangeFactory().createExchange(name, typeName, true, autoDelete, 0);
+            Exchange exchange;
+            Exchange existing = _virtualHost.getExchangeRegistry().getExchange(name);
+
+            if (existing != null)
+            {
+                _log.info("Exchange :" + existing + ": already exists in configured broker.");
+                exchange = existing;
+            }
+            else
+            {
+                exchange = _virtualHost.getExchangeFactory().createExchange(name, typeName, true, autoDelete, 0);
+
+                _virtualHost.getExchangeRegistry().registerExchange(exchange);
+
+                _log.info("Registering new durable exchange from BDB:" + exchange);
+            }
+
+            return exchange;
         }
         catch (AMQException e)
         {
@@ -47,10 +60,9 @@
     {
         Exchange exchange = (Exchange) object;
 
+        AMQShortStringEncoding.writeShortString(exchange.getName(), tupleOutput);
+        AMQShortStringEncoding.writeShortString(exchange.getType(), tupleOutput);
 
-        AMQShortStringEncoding.writeShortString(exchange.getName(),tupleOutput);
-        AMQShortStringEncoding.writeShortString(exchange.getType(),tupleOutput);
-
         tupleOutput.writeBoolean(exchange.isAutoDelete());
 
     }

Modified: store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java
===================================================================
--- store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java	2009-03-12 15:04:25 UTC (rev 3154)
+++ store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java	2009-03-13 12:22:07 UTC (rev 3155)
@@ -20,14 +20,14 @@
 import com.sleepycat.bind.tuple.TupleBinding;
 import com.sleepycat.bind.tuple.TupleInput;
 import com.sleepycat.bind.tuple.TupleOutput;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
 import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.log4j.Logger;
 
 public class QueueTuple_1 extends TupleBinding implements QueueTuple
 {
@@ -76,7 +76,26 @@
     {
         try
         {
-            return AMQQueueFactory.createAMQQueueImpl(name, true, owner, false, _virtualHost, arguments);
+
+            AMQQueue queue;
+            AMQQueue existing = _virtualHost.getQueueRegistry().getQueue(name);
+
+            if (existing != null)
+            {
+                _logger.info("Queue :" + existing + ": already exists in configured broker.");
+
+                queue = existing;
+            }
+            else
+            {
+                // Retrieve the existing Queue object
+                queue = AMQQueueFactory.createAMQQueueImpl(name, true, owner, false, _virtualHost, arguments);
+                _virtualHost.getQueueRegistry().registerQueue(queue);
+                _logger.info("Recovering queue " + queue.getName() + " with owner:"
+                             + ((queue.getOwner() == null) ? " <<null>> " : (" \"" + queue.getOwner() + "\" ")));
+            }
+
+            return queue;
         }
         catch (AMQException e)
         {




More information about the rhmessaging-commits mailing list