Author: ritchiem
Date: 2009-04-13 13:25:15 -0400 (Mon, 13 Apr 2009)
New Revision: 3283
Modified:
store/branches/java/0.5-release/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java
store/branches/java/0.5-release/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java
Log:
Merge more from trunk r3125. Changes are to lookup broker for existing Model
objects(Queue/Exchange)
Modified:
store/branches/java/0.5-release/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java
===================================================================
---
store/branches/java/0.5-release/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java 2009-04-13
16:09:04 UTC (rev 3282)
+++
store/branches/java/0.5-release/src/main/java/org/apache/qpid/server/store/berkeleydb/ExchangeTB.java 2009-04-13
17:25:15 UTC (rev 3283)
@@ -1,21 +1,18 @@
package org.apache.qpid.server.store.berkeleydb;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.AMQException;
-
import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.virtualhost.VirtualHost;
public class ExchangeTB extends TupleBinding
{
private static final Logger _log = Logger.getLogger(ExchangeTB.class);
-
-
private final VirtualHost _virtualHost;
public ExchangeTB(VirtualHost virtualHost)
@@ -29,12 +26,28 @@
AMQShortString name = AMQShortStringEncoding.readShortString(tupleInput);
AMQShortString typeName = AMQShortStringEncoding.readShortString(tupleInput);
-
boolean autoDelete = tupleInput.readBoolean();
try
{
- return _virtualHost.getExchangeFactory().createExchange(name, typeName, true,
autoDelete, 0);
+ Exchange exchange;
+ Exchange existing = _virtualHost.getExchangeRegistry().getExchange(name);
+
+ if (existing != null)
+ {
+ _log.info("Exchange :" + existing + ": already exists in
configured broker.");
+ exchange = existing;
+ }
+ else
+ {
+ exchange = _virtualHost.getExchangeFactory().createExchange(name,
typeName, true, autoDelete, 0);
+
+ _virtualHost.getExchangeRegistry().registerExchange(exchange);
+
+ _log.info("Registering new durable exchange from BDB:" +
exchange);
+ }
+
+ return exchange;
}
catch (AMQException e)
{
@@ -47,10 +60,9 @@
{
Exchange exchange = (Exchange) object;
+ AMQShortStringEncoding.writeShortString(exchange.getName(), tupleOutput);
+ AMQShortStringEncoding.writeShortString(exchange.getType(), tupleOutput);
- AMQShortStringEncoding.writeShortString(exchange.getName(),tupleOutput);
- AMQShortStringEncoding.writeShortString(exchange.getType(),tupleOutput);
-
tupleOutput.writeBoolean(exchange.isAutoDelete());
}
Modified:
store/branches/java/0.5-release/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java
===================================================================
---
store/branches/java/0.5-release/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java 2009-04-13
16:09:04 UTC (rev 3282)
+++
store/branches/java/0.5-release/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/QueueTuple_1.java 2009-04-13
17:25:15 UTC (rev 3283)
@@ -20,14 +20,14 @@
import com.sleepycat.bind.tuple.TupleBinding;
import com.sleepycat.bind.tuple.TupleInput;
import com.sleepycat.bind.tuple.TupleOutput;
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.store.berkeleydb.AMQShortStringEncoding;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.log4j.Logger;
public class QueueTuple_1 extends TupleBinding implements QueueTuple
{
@@ -76,7 +76,26 @@
{
try
{
- return AMQQueueFactory.createAMQQueueImpl(name, true, owner, false,
_virtualHost, arguments);
+
+ AMQQueue queue;
+ AMQQueue existing = _virtualHost.getQueueRegistry().getQueue(name);
+
+ if (existing != null)
+ {
+ _logger.info("Queue :" + existing + ": already exists in
configured broker.");
+
+ queue = existing;
+ }
+ else
+ {
+ // Retrieve the existing Queue object
+ queue = AMQQueueFactory.createAMQQueueImpl(name, true, owner, false,
_virtualHost, arguments);
+ _virtualHost.getQueueRegistry().registerQueue(queue);
+ _logger.info("Recovering queue " + queue.getName() + "
with owner:"
+ + ((queue.getOwner() == null) ? " <<null>>
" : (" \"" + queue.getOwner() + "\" ")));
+ }
+
+ return queue;
}
catch (AMQException e)
{