[rhmessaging-commits] rhmessaging commits: r3283 - in store/branches/java/0.5-release/src/main/java/org/apache/qpid/server/store/berkeleydb: tuples and 1 other directory.

rhmessaging-commits at lists.jboss.org rhmessaging-commits at lists.jboss.org
Mon Apr 13 13:25:16 EDT 2009


Author: ritchiem
Date: 2009-04-13 13:25:15 -0400 (Mon, 13 Apr 2009)
New Revision: 3283

Modified:
   store/branches/java/0.5-release/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java
   store/branches/java/0.5-release/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java
Log:
Merge more from trunk r3125. Changes are to lookup broker for existing Model objects(Queue/Exchange)


Modified: store/branches/java/0.5-release/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java
===================================================================
--- store/branches/java/0.5-release/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java	2009-04-13 16:09:04 UTC (rev 3282)
+++ store/branches/java/0.5-release/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java	2009-04-13 17:25:15 UTC (rev 3283)
@@ -1,21 +1,18 @@
 package org.apache.qpid.server.store.berkeleydb;
 
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.AMQException;
-
 import com.sleepycat.bind.tuple.TupleBinding;
 import com.sleepycat.bind.tuple.TupleInput;
 import com.sleepycat.bind.tuple.TupleOutput;
 import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.virtualhost.VirtualHost;
 
 public class ExchangeTB extends TupleBinding
 {
     private static final Logger _log = Logger.getLogger(ExchangeTB.class);
 
-
-
     private final VirtualHost _virtualHost;
 
     public ExchangeTB(VirtualHost virtualHost)
@@ -29,12 +26,28 @@
         AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
         AMQShortString typeName = AMQShortStringEncoding.readShortString(tupleInput);
 
-
         boolean autoDelete = tupleInput.readBoolean();
 
         try
         {
-            return _virtualHost.getExchangeFactory().createExchange(name, typeName, true, autoDelete, 0);
+            Exchange exchange;
+            Exchange existing = _virtualHost.getExchangeRegistry().getExchange(name);
+
+            if (existing != null)
+            {
+                _log.info("Exchange :" + existing + ": already exists in configured broker.");
+                exchange = existing;
+            }
+            else
+            {
+                exchange = _virtualHost.getExchangeFactory().createExchange(name, typeName, true, autoDelete, 0);
+
+                _virtualHost.getExchangeRegistry().registerExchange(exchange);
+
+                _log.info("Registering new durable exchange from BDB:" + exchange);
+            }
+
+            return exchange;
         }
         catch (AMQException e)
         {
@@ -47,10 +60,9 @@
     {
         Exchange exchange = (Exchange) object;
 
+        AMQShortStringEncoding.writeShortString(exchange.getName(), tupleOutput);
+        AMQShortStringEncoding.writeShortString(exchange.getType(), tupleOutput);
 
-        AMQShortStringEncoding.writeShortString(exchange.getName(),tupleOutput);
-        AMQShortStringEncoding.writeShortString(exchange.getType(),tupleOutput);
-
         tupleOutput.writeBoolean(exchange.isAutoDelete());
 
     }

Modified: store/branches/java/0.5-release/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java
===================================================================
--- store/branches/java/0.5-release/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java	2009-04-13 16:09:04 UTC (rev 3282)
+++ store/branches/java/0.5-release/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java	2009-04-13 17:25:15 UTC (rev 3283)
@@ -20,14 +20,14 @@
 import com.sleepycat.bind.tuple.TupleBinding;
 import com.sleepycat.bind.tuple.TupleInput;
 import com.sleepycat.bind.tuple.TupleOutput;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
 import org.apache.qpid.server.queue.AMQQueue;
 import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
 import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.log4j.Logger;
 
 public class QueueTuple_1 extends TupleBinding implements QueueTuple
 {
@@ -76,7 +76,26 @@
     {
         try
         {
-            return AMQQueueFactory.createAMQQueueImpl(name, true, owner, false, _virtualHost, arguments);
+
+            AMQQueue queue;
+            AMQQueue existing = _virtualHost.getQueueRegistry().getQueue(name);
+
+            if (existing != null)
+            {
+                _logger.info("Queue :" + existing + ": already exists in configured broker.");
+
+                queue = existing;
+            }
+            else
+            {
+                // Retrieve the existing Queue object
+                queue = AMQQueueFactory.createAMQQueueImpl(name, true, owner, false, _virtualHost, arguments);
+                _virtualHost.getQueueRegistry().registerQueue(queue);
+                _logger.info("Recovering queue " + queue.getName() + " with owner:"
+                             + ((queue.getOwner() == null) ? " <<null>> " : (" \"" + queue.getOwner() + "\" ")));
+            }
+
+            return queue;
         }
         catch (AMQException e)
         {




More information about the rhmessaging-commits mailing list