Author: ritchiem
Date: 2009-08-07 06:11:31 -0400 (Fri, 07 Aug 2009)
New Revision: 3551
Modified:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
Log:
Update to BDBMessageStore to have new operational logging messages
Modified:
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
===================================================================
---
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2009-08-07
10:03:43 UTC (rev 3550)
+++
store/trunk/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2009-08-07
10:11:31 UTC (rev 3551)
@@ -43,6 +43,7 @@
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.AbstractMessageStore;
import org.apache.qpid.server.store.berkeleydb.tuples.QueueTupleBindingFactory;
import org.apache.qpid.server.store.berkeleydb.tuples.BindingTupleBindingFactory;
import org.apache.qpid.server.store.berkeleydb.tuples.QueueTuple;
@@ -50,6 +51,8 @@
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.messages.MessageStoreMessages;
import java.io.File;
import java.util.ArrayList;
@@ -73,7 +76,7 @@
* exchanges. <tr><td> Store and remove messages. <tr><td> Bind
and unbind queues to exchanges. <tr><td> Enqueue and
* dequeue messages to queues. <tr><td> Generate message identifiers.
</table>
*/
-public class BDBMessageStore implements MessageStore
+public class BDBMessageStore extends AbstractMessageStore
{
private static final Logger _log = Logger.getLogger(BDBMessageStore.class);
@@ -184,6 +187,8 @@
*/
public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration
vHostConfig) throws Exception
{
+ super.configure(virtualHost, base, vHostConfig);
+
Configuration config = vHostConfig.getStoreConfiguration();
File environmentPath = new File(config.getString(ENVIRONMENT_PATH_PROPERTY,
"bdbEnv"));
if (!environmentPath.exists())
@@ -195,6 +200,8 @@
}
}
+ CurrentActor.get().message(_logSubject,
MessageStoreMessages.MST_1002(environmentPath.getAbsolutePath()));
+
_version = config.getInt(DATABASE_FORMAT_VERSION_PROPERTY,
DATABASE_FORMAT_VERSION);
configure(virtualHost, environmentPath, false);
@@ -447,6 +454,8 @@
closeEnvironment();
_state = State.CLOSED;
+
+ super.close();
}
private void closeEnvironment() throws DatabaseException
@@ -1220,6 +1229,8 @@
stateTransition(State.CONFIGURED, State.RECOVERING);
_log.info("Recovering persistent state...");
+ CurrentActor.get().message(_logSubject,MessageStoreMessages.MST_1004(null,
false));
+
StoreContext context = new StoreContext();
try
{
@@ -1228,13 +1239,18 @@
for (AMQQueue q : queues.values())
{
+
CurrentActor.get().message(_logSubject,MessageStoreMessages.MST_1004(String.valueOf(q.getName()),
true));
+
q.configure(virtualHost.getConfiguration().getQueueConfiguration(q.getName().asString()));
+
}
recoverExchanges();
deliverMessages(context, queues);
_log.info("Persistent state recovered successfully");
+ CurrentActor.get().message(_logSubject, MessageStoreMessages.MST_1006(null,
false));
+
commitTran(context);
}
@@ -1658,18 +1674,14 @@
_log.debug("On recovery, delivering Message ID:" +
message.getMessageId() + " to " + queue.getName());
}
- if (_log.isInfoEnabled())
+ Integer count = queueRecoveries.get(queueName);
+ if (count == null)
{
- Integer count = queueRecoveries.get(queueName);
- if (count == null)
- {
- count = 0;
- }
+ count = 0;
+ }
- queueRecoveries.put(queueName, ++count);
+ queueRecoveries.put(queueName, ++count);
- }
-
actions.add(new ProcessAction(queue, context, message));
}
@@ -1703,6 +1715,13 @@
{
_log.info("Recovered message counts: " + queueRecoveries);
}
+
+ for(Map.Entry<AMQShortString,Integer> entry : queueRecoveries.entrySet())
+ {
+ CurrentActor.get().message(_logSubject,
MessageStoreMessages.MST_1005(entry.getValue(), String.valueOf(entry.getKey())));
+
+ CurrentActor.get().message(_logSubject,
MessageStoreMessages.MST_1006(String.valueOf(entry.getKey()), true));
+ }
}
QueueRegistry getQueueRegistry()