[rhmessaging-commits] rhmessaging commits: r3778 - in store/branches/java/0.5-release: src/tools/java and 1 other directories.
rhmessaging-commits at lists.jboss.org
rhmessaging-commits at lists.jboss.org
Mon Jan 11 10:00:28 EST 2010
Author: rgemmell
Date: 2010-01-11 10:00:27 -0500 (Mon, 11 Jan 2010)
New Revision: 3778
Added:
store/branches/java/0.5-release/bin/bindingsWorkaround.sh
store/branches/java/0.5-release/src/tools/java/BDBStoreBindingsWorkaround-log4j.xml
store/branches/java/0.5-release/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreBindingsWorkaround.java
Log:
Add initial version of tool to workaround broker bindings issue. Use the broker config to load the VirtualHosts and then insert required durable Exchange entries into the store
Added: store/branches/java/0.5-release/bin/bindingsWorkaround.sh
===================================================================
--- store/branches/java/0.5-release/bin/bindingsWorkaround.sh (rev 0)
+++ store/branches/java/0.5-release/bin/bindingsWorkaround.sh 2010-01-11 15:00:27 UTC (rev 3778)
@@ -0,0 +1,26 @@
+#!/bin/bash
+
+QPID_VERSION=0.5
+
+if [ -z "$QPID_HOME" ]; then
+ echo "QPID_HOME not set. Exiting"
+ exit 1
+fi
+
+if [ -z "$QPID_WORK" ]; then
+ echo "QPID_WORK not set. Exiting"
+ exit 1
+fi
+
+QPID_SYS_PROPS="-DQPID_HOME=$QPID_HOME -DQPID_WORK=$QPID_WORK"
+
+LIBS=$QPID_HOME/lib/qpid-bdbtools-$QPID_VERSION.jar:$QPID_HOME/lib/je-3.3.62.jar:$QPID_HOME/lib/qpid-bdbstore-$QPID_VERSION.jar:$QPID_HOME/lib/qpid-all.jar
+
+java -Dlog4j.configuration=BDBStoreBindingsWorkaround-log4j.xml $QPID_SYS_PROPS -cp $LIBS org.apache.qpid.server.store.berkeleydb.BDBStoreBindingsWorkaround "$@"
+exitValue=$?
+
+if [ $exitValue != 0 ]
+then
+ exit $exitValue
+fi
+
Added: store/branches/java/0.5-release/src/tools/java/BDBStoreBindingsWorkaround-log4j.xml
===================================================================
--- store/branches/java/0.5-release/src/tools/java/BDBStoreBindingsWorkaround-log4j.xml (rev 0)
+++ store/branches/java/0.5-release/src/tools/java/BDBStoreBindingsWorkaround-log4j.xml 2010-01-11 15:00:27 UTC (rev 3778)
@@ -0,0 +1,45 @@
+<?xml version="1.0"?>
+<!--
+ -
+ - Licensed to the Apache Software Foundation (ASF) under one
+ - or more contributor license agreements. See the NOTICE file
+ - distributed with this work for additional information
+ - regarding copyright ownership. The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License. You may obtain a copy of the License at
+ -
+ - http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied. See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -
+ -->
+<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd">
+<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/">
+
+ <appender name="STDOUT" class="org.apache.log4j.ConsoleAppender">
+ <layout class="org.apache.log4j.PatternLayout">
+ <param name="ConversionPattern" value="%d %-5p - %m%n"/>
+ </layout>
+ </appender>
+
+ <category name="org.apache.qpid.server.store.berkeleydb.BDBStoreBindingsWorkaround">
+ <priority value="info"/>
+ </category>
+
+ <!-- Only show errors -->
+ <category name="org.apache">
+ <priority value="error"/>
+ </category>
+
+ <root>
+ <priority value="error"/>
+ <appender-ref ref="STDOUT"/>
+ </root>
+
+</log4j:configuration>
Added: store/branches/java/0.5-release/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreBindingsWorkaround.java
===================================================================
--- store/branches/java/0.5-release/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreBindingsWorkaround.java (rev 0)
+++ store/branches/java/0.5-release/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreBindingsWorkaround.java 2010-01-11 15:00:27 UTC (rev 3778)
@@ -0,0 +1,310 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import org.apache.qpid.server.store.berkeleydb.BDBMessageStore;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.configuration.Configuration;
+import org.apache.qpid.configuration.Configuration.InitException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
+import org.apache.qpid.server.store.berkeleydb.DatabaseVisitor;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.DatabaseException;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+public class BDBStoreBindingsWorkaround
+{
+ private static final Logger _logger = LoggerFactory.getLogger(BDBStoreBindingsWorkaround.class);
+
+ private Configuration _config;
+ private boolean _initialised = false;
+
+ public static void main(String[] args) throws Configuration.InitException
+ {
+ BDBStoreBindingsWorkaround tool = new BDBStoreBindingsWorkaround(args);
+
+ tool.start();
+
+ //Shut down the JVM gracefully, the ShutdownHook will stop the VirtualHosts.
+ System.exit(0);
+ }
+
+ public BDBStoreBindingsWorkaround(String[] args) throws InitException
+ {
+ Runtime.getRuntime().addShutdownHook(new Thread(new ShutdownHook(this)));
+
+ loadConfig(args);
+ }
+
+ @SuppressWarnings("static-access")
+ private void loadConfig(String args[]) throws InitException
+ {
+ _config = new Configuration();
+
+ Option configFile =
+ OptionBuilder.withArgName("file").hasArg()
+ .withDescription("use given configuration file By "
+ + "default looks for a file named "
+ + Configuration.DEFAULT_CONFIG_FILE + " in " + Configuration.QPID_HOME)
+ .withLongOpt("config")
+ .create("c");
+
+ _config.setOption(configFile);
+
+ _config.processCommandline(args);
+ }
+
+ /**
+ * Simple ShutdownHook to cleanly shutdown the VirtualHosts on JVM shut down
+ */
+ protected class ShutdownHook implements Runnable
+ {
+ BDBStoreBindingsWorkaround _tool;
+
+ ShutdownHook(BDBStoreBindingsWorkaround bindingsTool)
+ {
+ _tool = bindingsTool;
+ }
+
+ public void run()
+ {
+ _tool.shutdown();
+ }
+ }
+
+ protected void shutdown()
+ {
+ if (_initialised)
+ {
+ ApplicationRegistry.remove(1);
+ }
+ }
+
+ protected void start()
+ {
+ _initialised = false;
+
+ _logger.info("BDBStore BindingsWorkaround process commencing");
+
+ loadVirtualHosts();
+
+ if (!_initialised)
+ {
+ System.exit(1);
+ }
+
+ addDurableExchangesToStoreIfRequired();
+ _logger.info("Workaround process complete");
+ }
+
+ private void loadVirtualHosts()
+ {
+ final File configFile = _config.getConfigFile();
+
+ if (!configFile.exists())
+ {
+ _logger.error("Config file not found:" + configFile.getAbsolutePath());
+ _logger.error("Options: [-c <broker config file>] : Defaults to \"$QPID_HOME/etc/config.xml\"");
+ return;
+ }
+ else
+ {
+ _logger.info("Using config file :" + configFile.getAbsolutePath());
+ }
+
+ try
+ {
+ _logger.info("Starting the VirtualHosts");
+
+ ConfigurationFileApplicationRegistry registry = new ConfigurationFileApplicationRegistry(configFile);
+
+ disableManagementStartup(registry.getConfiguration());
+
+ ApplicationRegistry.remove(1);
+ ApplicationRegistry.initialise(registry);
+
+ checkMessageStores();
+ _initialised = true;
+ }
+ catch (ConfigurationException e)
+ {
+ _logger.error("Unable to load configuration due to configuration error: " + e.getMessage());
+ e.printStackTrace();
+ System.exit(1);
+ }
+ catch (Exception e)
+ {
+ _logger.error("Unable to load configuration due to: " + e.getMessage());
+ e.printStackTrace();
+ System.exit(1);
+ }
+ }
+
+ private void disableManagementStartup(ServerConfiguration config)
+ {
+ /*
+ * Update the server config to indicate management is disabled, which
+ * should prompt usage of the NoopManagedObjectRegistry instead of
+ * the JMXManagedObjectRegistry.
+ */
+ config.setManagementEnabled(false);
+
+ /*
+ * Set the com.sun.management.jmxremote property (to any value)
+ * if it isnt already set.
+ *
+ * The JMXManagedObjectRegistry checks for this system property before
+ * starting its own ConnectorServer programatically as this (usually)
+ * implies that the JVM was started explicitly with properties to enable
+ * its out-of-the-box JMX agent, to which JMXManagedObjectRegistry defers.
+ */
+ if(System.getProperty("com.sun.management.jmxremote") == null)
+ {
+ System.setProperty("com.sun.management.jmxremote", "8999");
+ }
+ }
+
+ private void checkMessageStores()
+ {
+ Collection<VirtualHost> vhosts = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts();
+
+ for (VirtualHost vhost : vhosts)
+ {
+ if(! (vhost.getMessageStore() instanceof BDBMessageStore) )
+ {
+ _logger.warn("Virtualhost '" + vhost.getName() + "' is not using a BDBMessageStore. "
+ + "No changes will be made to it.");
+ }
+ }
+ }
+
+ private static List<AMQShortString> getBDBStoreExchangeNames(final VirtualHost vhost, final BDBMessageStore store)
+ {
+ final List<AMQShortString> exchanges = new ArrayList<AMQShortString>();
+
+ //Create a visitor to visit the Exchange entries and gather the names
+ DatabaseVisitor exchangeVisitor = new DatabaseVisitor()
+ {
+ @SuppressWarnings("unchecked")
+ public void visit(DatabaseEntry key, DatabaseEntry value) throws DatabaseException
+ {
+ _logger.debug("Visiting new Exchange store entry");
+
+ TupleBinding binding = new ExchangeTB(vhost);
+ Exchange exchange = (Exchange) binding.entryToObject(value);
+
+ if(exchange != null)
+ {
+ AMQShortString name = exchange.getName();
+ if(name != null)
+ {
+ _logger.debug("Visited store entry for Exchange: " + name);
+ exchanges.add(name);
+ }
+ }
+ }
+ };
+
+ try
+ {
+ store.visitExchanges(exchangeVisitor);
+ }
+ catch (Exception e)
+ {
+ _logger.error("Error retrieving exiting Exchange names from the store: " + e.getMessage());
+ e.printStackTrace();
+ }
+
+ return exchanges;
+ }
+
+ private static void addDurableExchangesToStoreIfRequired()
+ {
+ Collection<VirtualHost> vhosts = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHosts();
+
+ //For each active VHost, add all durable exchanges in the ExchangeRegistry to the store if they arent already in it.
+ for(VirtualHost vhost : vhosts)
+ {
+ _logger.info("Beginning process for VirtualHost: " + vhost.getName());
+
+ BDBMessageStore store;
+
+ if(!(vhost.getMessageStore() instanceof BDBMessageStore) )
+ {
+ _logger.info("Store is not a BDBMessageStore, skipping");
+ continue;
+ }
+ else
+ {
+ store = (BDBMessageStore) vhost.getMessageStore();
+ }
+
+ List<AMQShortString> bdbExchangeNames = getBDBStoreExchangeNames(vhost, store);
+
+ ExchangeRegistry registry = vhost.getExchangeRegistry();
+ Collection<AMQShortString> exchangeNames = registry.getExchangeNames();
+
+ for(AMQShortString exchangeName : exchangeNames)
+ {
+ Exchange exchange = registry.getExchange(exchangeName);
+ if(exchange.isDurable())
+ {
+ if(bdbExchangeNames.contains(exchangeName))
+ {
+ _logger.info("Store already contains entry for Exchange: " + exchangeName);
+ }
+ else
+ {
+ try
+ {
+ _logger.info("Adding store entry for Exchange: " + exchangeName);
+ store.createExchange(exchange);
+ }
+ catch (AMQException e)
+ {
+ _logger.error("Error adding entry to store for Exchange '" + exchangeName + "':" + e.getMessage());
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+ }
+ }
+
+}
More information about the rhmessaging-commits
mailing list