Author: ritchiem
Date: 2008-09-02 10:31:35 -0400 (Tue, 02 Sep 2008)
New Revision: 2369
Added:
store/branches/java/broker-queue-refactor/java/bdbstore/etc/config.xml
store/branches/java/broker-queue-refactor/java/bdbstore/etc/jmxremote.access
store/branches/java/broker-queue-refactor/java/bdbstore/etc/passwd
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/DurableSubscriber.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/JNDIHelper.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/Publisher.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/tools/
store/branches/java/broker-queue-refactor/java/bdbstore/src/tools/java/
store/branches/java/broker-queue-refactor/java/bdbstore/src/tools/java/org/
store/branches/java/broker-queue-refactor/java/bdbstore/src/tools/java/org/apache/
store/branches/java/broker-queue-refactor/java/bdbstore/src/tools/java/org/apache/qpid/
store/branches/java/broker-queue-refactor/java/bdbstore/src/tools/java/org/apache/qpid/server/
store/branches/java/broker-queue-refactor/java/bdbstore/src/tools/java/org/apache/qpid/server/store/
store/branches/java/broker-queue-refactor/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/
store/branches/java/broker-queue-refactor/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
Modified:
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java
Log:
RHM-6 :Create BDBStore Upgrade Tool, provided additional test to validate its
functionality.
Added: store/branches/java/broker-queue-refactor/java/bdbstore/etc/config.xml
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/etc/config.xml
(rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/etc/config.xml 2008-09-02
14:31:35 UTC (rev 2369)
@@ -0,0 +1,130 @@
+<?xml version="1.0" encoding="ISO-8859-1"?>
+<!--
+ -
+ - Licensed to the Apache Software Foundation (ASF) under one
+ - or more contributor license agreements. See the NOTICE file
+ - distributed with this work for additional information
+ - regarding copyright ownership. The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License. You may obtain a copy of the License at
+ -
+ -
http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied. See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -
+ -->
+<broker>
+ <prefix>${QPID_HOME}</prefix>
+ <work>${QPID_WORK}</work>
+ <conf>${prefix}/etc</conf>
+ <connector>
+ <!-- Uncomment out this block and edit the keystorePath and keystorePassword
+ to enable SSL support
+ <ssl>
+ <enabled>true</enabled>
+ <sslOnly>true</sslOnly>
+ <keystorePath>/path/to/keystore.ks</keystorePath>
+ <keystorePassword>keystorepass</keystorePassword>
+ </ssl>-->
+ <qpidnio>false</qpidnio>
+ <protectio>
+ <enabled>false</enabled>
+ </protectio>
+ <transport>nio</transport>
+ <port>5672</port>
+ <sslport>8672</sslport>
+ <socketReceiveBuffer>32768</socketReceiveBuffer>
+ <socketSendBuffer>32768</socketSendBuffer>
+ </connector>
+ <management>
+ <enabled>true</enabled>
+ <jmxport>8999</jmxport>
+ <security-enabled>false</security-enabled>
+ </management>
+ <advanced>
+ <filterchain enableExecutorPool="true"/>
+ <enablePooledAllocator>false</enablePooledAllocator>
+ <enableDirectBuffers>false</enableDirectBuffers>
+ <framesize>65535</framesize>
+ <compressBufferOnQueue>false</compressBufferOnQueue>
+ <enableJMSXUserID>false</enableJMSXUserID>
+ </advanced>
+
+ <security>
+ <principal-databases>
+ <!-- Example use of Base64 encoded MD5 hashes for authentication via
CRAM-MD5-Hashed -->
+ <principal-database>
+ <name>passwordfile</name>
+
<class>org.apache.qpid.server.security.auth.database.PlainPasswordFilePrincipalDatabase</class>
+ <attributes>
+ <attribute>
+ <name>passwordFile</name>
+ <value>${conf}/passwd</value>
+ </attribute>
+ </attributes>
+ </principal-database>
+ </principal-databases>
+
+ <access>
+
<class>org.apache.qpid.server.security.access.plugins.AllowAll</class>
+ </access>
+ <jmx>
+ <access>${conf}/jmxremote.access</access>
+ <principal-database>passwordfile</principal-database>
+ </jmx>
+ </security>
+
+ <virtualhosts>
+ <directory>${conf}/virtualhosts</directory>
+
+ <virtualhost>
+ <name>localhost</name>
+ <localhost>
+ <store>
+
<class>org.apache.qpid.server.store.MemoryMessageStore</class>
+ </store>
+
+ <housekeeping>
+
<expiredMessageCheckPeriod>20000</expiredMessageCheckPeriod>
+ </housekeeping>
+
+ </localhost>
+ </virtualhost>
+
+ <virtualhost>
+ <name>development</name>
+ <development>
+ <store>
+
<class>org.apache.qpid.server.store.MemoryMessageStore</class>
+ </store>
+ </development>
+ </virtualhost>
+
+ <virtualhost>
+ <name>test</name>
+ <test>
+ <store>
+
<class>org.apache.qpid.server.store.MemoryMessageStore</class>
+ </store>
+ </test>
+ </virtualhost>
+
+ </virtualhosts>
+ <heartbeat>
+ <delay>0</delay>
+ <timeoutFactor>2.0</timeoutFactor>
+ </heartbeat>
+ <queue>
+ <auto_register>true</auto_register>
+ </queue>
+
+ <virtualhosts>${conf}/virtualhosts.xml</virtualhosts>
+</broker>
+
+
Added: store/branches/java/broker-queue-refactor/java/bdbstore/etc/jmxremote.access
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/etc/jmxremote.access
(rev 0)
+++
store/branches/java/broker-queue-refactor/java/bdbstore/etc/jmxremote.access 2008-09-02
14:31:35 UTC (rev 2369)
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+#
http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+#Generated by JMX Console : Last edited by user:admin
+#Tue Jun 12 16:46:39 BST 2007
+admin=admin
+guest=readonly
+user=readwrite
Added: store/branches/java/broker-queue-refactor/java/bdbstore/etc/passwd
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/etc/passwd
(rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/etc/passwd 2008-09-02 14:31:35
UTC (rev 2369)
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+#
http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+guest:guest
+client:guest
+server:guest
+
Modified:
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java
===================================================================
---
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java 2008-09-02
14:30:22 UTC (rev 2368)
+++
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java 2008-09-02
14:31:35 UTC (rev 2369)
@@ -69,7 +69,6 @@
private static volatile int _loops;
private String TEST_LOCATION = "bdbTestEnv";
File BDB_DIR = new File(TEST_LOCATION);
-
public void setUp() throws Exception
{
@@ -83,13 +82,13 @@
BDB_DIR.mkdirs();
_store = new BDBMessageStore();
+ _store.configure(BDB_DIR);
- _store.createEnvironment(BDB_DIR);
- _store.openDatabases();
_virtualHost = new VirtualHost("test", _store);
_store.setVirtualHost(_virtualHost);
- _store.startCommitThread();
+ _store.recover();
+
_txnContext = new NonTransactionalContext(_store, _storeContext, null, new
LinkedList<RequiredDeliveryException>());
}
@@ -111,24 +110,24 @@
private void reload() throws Exception
{
_virtualHost.close();
-
+
PropertiesConfiguration env = new PropertiesConfiguration();
env.addProperty("store.environment-path", "bdbTestEnv");
env.addProperty("store.class",
"org.apache.qpid.server.store.berkeleydb.BDBMessageStore");
_virtualHost = new VirtualHost("test", env);
- _store = (BDBMessageStore)_virtualHost.getMessageStore();
+ _store = (BDBMessageStore) _virtualHost.getMessageStore();
}
public void tearDown() throws Exception
{
_virtualHost.close();
- ApplicationRegistry.removeAll();
+ ApplicationRegistry.remove(1);
}
- public void testExchangePersistence() throws Exception
+ public void testExchangePersistence() throws Exception
{
FieldTable queueArguments = new FieldTable();
Integer priorityLevel = 5;
@@ -332,7 +331,7 @@
{
// pass since exception expected
}
-
+
}
public void testTranCommit() throws Exception
@@ -348,7 +347,7 @@
_store.storeMessageMetaData(_storeContext, 22L, new MessageMetaData(pubBody, chb,
0));
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false,
_virtualHost, null);
- _store.createQueue(queue, null);
+ _store.createQueue(queue);
_store.beginTran(_storeContext);
_store.enqueueMessage(_storeContext, queue, 20L);
@@ -381,7 +380,7 @@
_store.storeMessageMetaData(_storeContext, 32L, new MessageMetaData(pubBody, chb,
0));
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false,
_virtualHost, null);
- _store.createQueue(queue, null);
+ _store.createQueue(queue);
_store.beginTran(_storeContext);
_store.enqueueMessage(_storeContext, queue, 30L);
@@ -395,17 +394,16 @@
_store.beginTran(_storeContext);
_store.commitTran(_storeContext);
- enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
+ enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
assertTrue("Last Test Message is still present",
!enqueuedIds.contains(20L));
assertEquals("Incorrect Enqueued Message Count:", 2,
enqueuedIds.size());
Long val = enqueuedIds.get(0);
assertEquals("First Message is incorrect", 30L, val.longValue());
val = enqueuedIds.get(1);
assertEquals("Second Message is incorrect", 31L, val.longValue());
-
+
}
-
public void testTranRollback2() throws Exception
{
List<Long> enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
@@ -423,7 +421,7 @@
_store.storeMessageMetaData(_storeContext, 32L, new MessageMetaData(pubBody, chb,
0));
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false,
_virtualHost, null);
- _store.createQueue(queue, null);
+ _store.createQueue(queue);
_store.beginTran(_storeContext);
_store.enqueueMessage(_storeContext, queue, 30L);
@@ -461,8 +459,8 @@
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false,
_virtualHost, null);
AMQQueue queue2 = AMQQueueFactory.createAMQQueueImpl(QUEUE2, true, HIM, false,
_virtualHost, null);
- _store.createQueue(queue, null);
- _store.createQueue(queue2, null);
+ _store.createQueue(queue);
+ _store.createQueue(queue2);
_store.beginTran(_storeContext);
_store.enqueueMessage(_storeContext, queue, 40L);
@@ -504,7 +502,7 @@
_store.storeMessageMetaData(_storeContext, 50L, new MessageMetaData(pubBody, chb,
0));
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false,
_virtualHost, null);
- _store.createQueue(queue, null);
+ _store.createQueue(queue);
_store.enqueueMessage(_storeContext, queue, 50L);
_store.dequeueMessage(_storeContext, queue, 50L);
@@ -513,7 +511,7 @@
public void testQueueRemove() throws AMQException
{
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false,
_virtualHost, null);
- _store.createQueue(queue, null);
+ _store.createQueue(queue);
_store.removeQueue(queue);
try
{
Added:
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
===================================================================
---
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
(rev 0)
+++
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java 2008-09-02
14:31:35 UTC (rev 2369)
@@ -0,0 +1,218 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.berkeleydb.utils.Publisher;
+import org.apache.qpid.server.store.berkeleydb.utils.DurableSubscriber;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.util.FileUtils;
+import org.apache.commons.configuration.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+import junit.framework.TestCase;
+
+import java.io.File;
+import java.io.IOException;
+
+public class BDBUpgradeTest extends TestCase
+{
+ protected static final Logger _logger =
LoggerFactory.getLogger(BDBUpgradeTest.class);
+
+ final String BDBHome = System.getProperty("BDB_HOME");
+ final File _configFile = new File(BDBHome, "etc/config.xml");
+
+ private String VIRTUALHOST = "test";
+
+ private static final String VERSION_1 = "1";
+ private static final String VERSION_2 = "2";
+
+ private String _topic = "MyDurableSubscriptionTestTopic";
+
+ String _fromDir =
System.getProperty("QPID_WORK")+"/version1Store";
+ String _toDir =
System.getProperty("QPID_WORK")+"/version2Store";
+
+
+ public void setUp() throws IOException
+ {
+ assertNotNull("QPID_WORK must be set",
System.getProperty("QPID_WORK"));
+
+ File directory = new File(_toDir);
+
+ if (directory.exists() && directory.isDirectory())
+ {
+ FileUtils.delete(directory, true);
+ }
+
+ directory = new File(_fromDir);
+
+ if (directory.exists() && directory.isDirectory())
+ {
+ FileUtils.delete(directory, true);
+ }
+
+ }
+
+
+ public void testDurababilitySelectors() throws Exception
+ {
+ String broker = "vm://:1";
+
+ startBroker(1, VERSION_1);
+
+ new DurableSubscriber(broker, _topic, null).close();
+
+ sendMessages(broker, _topic, 10);
+
+ sendAndCheckDurableSubscriber(broker, false, true, 5, "odd=true");
+
+ Thread.sleep(2000);
+
+ stopBroker(1);
+
+ upgradeBroker();
+
+ broker = "vm://:2";
+
+ startBroker(2, VERSION_2);
+
+ //Ensure msg were transitioned to new broker
+ sendAndCheckDurableSubscriber(broker, false, false, 5, null);
+
+ //Reset the Selector Pattern
+ new DurableSubscriber(broker, _topic, "odd=true").close();
+
+ stopBroker(2);
+
+ startBroker(2, VERSION_2);
+
+ //Ensure that the selector was preseved on restart and caused all msgs to be
removed.
+ sendAndCheckDurableSubscriber(broker, false, false, 0, null);
+ stopBroker(2);
+ }
+
+ public void testDurabability() throws Exception
+ {
+ String broker = "vm://:1";
+
+ startBroker(1, VERSION_1);
+
+ sendAndCheckDurableSubscriber(broker, true, false, 10, null);
+
+ stopBroker(1);
+
+ startBroker(1, VERSION_1);
+
+ sendAndCheckDurableSubscriber(broker, false, true, 10, null);
+
+ stopBroker(1);
+ }
+
+ private void upgradeBroker() throws Exception
+ {
+ new BDBStoreUpgrade(_fromDir, _toDir).upgradeFromVersion(1);
+ }
+
+ private void stopBroker(int port)
+ {
+ TransportConnection.killVMBroker(port);
+ ApplicationRegistry.remove(port);
+ }
+
+ private void startBroker(int port, String version) throws Exception
+ {
+ //Create a Version 1 store
+ ConfigurationFileApplicationRegistry config = new
ConfigurationFileApplicationRegistry(_configFile);
+
+ //Disable management on broker.
+ config.getConfiguration().setProperty("management.enabled",
"false");
+
+ Configuration testVirtualhost =
config.getConfiguration().subset("virtualhosts.virtualhost." + VIRTUALHOST);
+ testVirtualhost.setProperty("store.class",
"org.apache.qpid.server.store.berkeleydb.BDBMessageStore");
+ testVirtualhost.setProperty("store." +
BDBMessageStore.ENVIRONMENT_PATH_PROPERTY, "${work}/version" + port +
"Store");
+ testVirtualhost.setProperty("store.version", version);
+
+ ApplicationRegistry.initialise(config, port);
+ TransportConnection.createVMBroker(port);
+
+ }
+
+ public void sendAndCheckDurableSubscriber(String broker, boolean send, boolean
commitRecieved, int count, String selector)
+ throws Exception
+ {
+
+ DurableSubscriber subscriber = new DurableSubscriber(broker, _topic, selector);
+
+ try
+ {
+ if (send)
+ {
+ sendMessages(broker, _topic, count);
+ }
+
+ //Wait for messages to arrive.
+ // 1sec should be enough for delivery of 10 msgs.
+ int loop = 0;
+
+ Thread.sleep(1000);
+
+ while (subscriber.getMessages().size() < count)
+ {
+ assertTrue("Waited more than 5 second for " + count +
"messages to be delivered:"
+ + subscriber.getMessages().size(), loop < 5);
+ loop++;
+ Thread.sleep(1000);
+ }
+
+ assertEquals("Incorrect number of messages received", count,
subscriber.getMessages().size());
+
+ if (commitRecieved)
+ {
+ subscriber.commit();
+ }
+ }
+ finally
+ {
+ subscriber.close();
+ }
+ }
+
+ private void sendMessages(String broker, String topic, int totalMessages) throws
JMSException
+ {
+
+ Publisher publisher = new Publisher(broker, topic);
+
+ for (int i = 0; i < totalMessages; i++)
+ {
+ Message msg = publisher.createTextMessage("message " + i);
+ msg.setBooleanProperty("odd", i % 2 == 0);
+ publisher.send(msg);
+ }
+
+ publisher.commit();
+ }
+
+}
Added:
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/DurableSubscriber.java
===================================================================
---
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/DurableSubscriber.java
(rev 0)
+++
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/DurableSubscriber.java 2008-09-02
14:31:35 UTC (rev 2369)
@@ -0,0 +1,112 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.utils;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQTopic;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Session;
+import javax.jms.TopicSubscriber;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.MessageListener;
+import javax.jms.Message;
+import java.util.List;
+import java.util.LinkedList;
+
+public class DurableSubscriber implements MessageListener
+{
+ private static final Logger _logger =
LoggerFactory.getLogger(DurableSubscriber.class);
+
+ JNDIHelper _jndiHelper;
+
+ Session _session;
+ TopicSubscriber _subscriber;
+ AMQConnection _connection;
+ private List<Message> _received;
+
+ public static void main(String[] args) throws JMSException
+ {
+ new Publisher();
+ }
+
+ public DurableSubscriber() throws JMSException
+ {
+ this(JNDIHelper.DEFAULT_BROKER, null);
+ }
+
+ public DurableSubscriber(String broker, String topic) throws JMSException
+ {
+ this(broker, topic, null);
+ }
+
+ public DurableSubscriber(String broker, String topicStr, String selector) throws
JMSException
+ {
+ _jndiHelper = new JNDIHelper(broker );
+
+ _connection = (AMQConnection) ((ConnectionFactory)
_jndiHelper.lookupJNDI(JNDIHelper.CONNECTION_JNDI_NAME)).createConnection();
+
+ _jndiHelper.close();
+
+ AMQTopic topic = new AMQTopic(_connection, topicStr);
+
+ _logger.debug("Create Session");
+ _session = _connection.createSession(true, Session.SESSION_TRANSACTED);
+ _logger.debug("Create Durable Subscriber on Session");
+
+ if (selector != null)
+ {
+ _subscriber = _session.createDurableSubscriber(topic,
"MySubscription", selector, false);
+ }
+ else
+ {
+ _subscriber = _session.createDurableSubscriber(topic,
"MySubscription");
+ }
+
+ _received = new LinkedList<Message>();
+
+ _subscriber.setMessageListener(this);
+
+ _connection.start();
+ }
+
+ public void close() throws JMSException
+ {
+ _connection.close();
+ }
+
+ public void onMessage(Message message)
+ {
+ _received.add(message);
+ }
+
+ public List<Message> getMessages()
+ {
+ return _received;
+ }
+
+ public void commit() throws JMSException
+ {
+ _session.commit();
+ }
+}
Added:
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/JNDIHelper.java
===================================================================
---
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/JNDIHelper.java
(rev 0)
+++
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/JNDIHelper.java 2008-09-02
14:31:35 UTC (rev 2369)
@@ -0,0 +1,107 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.utils;
+
+import javax.naming.NamingException;
+import javax.naming.InitialContext;
+import javax.naming.Context;
+import java.util.Properties;
+
+public class JNDIHelper
+{
+ public static final String DEFAULT_BROKER = "tcp://localhost:2345";
+ public static final String CONNECTION_JNDI_NAME = "local";
+
+ public final String INITIAL_CONTEXT_FACTORY =
"org.apache.qpid.jndi.PropertiesFileInitialContextFactory";
+ public final String CONNECTION_NAME;
+
+ InitialContext _ctx;
+
+ public JNDIHelper(String broker)
+ {
+ CONNECTION_NAME = "amqp://guest:guest@clientid/test?brokerlist='" +
broker + "'";
+ setupJNDI();
+ }
+
+ /**
+ * Lookup the specified name in the JNDI Context.
+ *
+ * @param name The string name of the object to lookup
+ *
+ * @return The object or null if nothing exists for specified name
+ */
+ public Object lookupJNDI(String name)
+ {
+ try
+ {
+ return _ctx.lookup(name);
+ }
+ catch (NamingException e)
+ {
+ System.err.println("Error looking up '" + name + "' in
JNDI Context:" + e);
+ }
+
+ return null;
+ }
+
+ /**
+ * Setup the JNDI context.
+ *
+ * In this case we are simply using a Properties object to store the pairing
information.
+ *
+ * Further details can be found on the wiki site here:
+ *
+ * @see :
http://cwiki.apache.org/qpid/how-to-use-jndi.html
+ */
+ private void setupJNDI()
+ {
+ // Set the properties ...
+ Properties properties = new Properties();
+ properties.put(Context.INITIAL_CONTEXT_FACTORY, INITIAL_CONTEXT_FACTORY);
+ properties.put("connectionfactory." + CONNECTION_JNDI_NAME,
CONNECTION_NAME);
+
+ // Create the initial context
+ Context ctx = null;
+ try
+ {
+ _ctx = new InitialContext(properties);
+ }
+ catch (NamingException e)
+ {
+ System.err.println("Error Setting up JNDI Context:" + e);
+ }
+ }
+
+ /** Close the JNDI Context to keep everything happy. */
+ public void close()
+ {
+ try
+ {
+ _ctx.close();
+ }
+ catch (NamingException e)
+ {
+ System.err.println("Unable to close JNDI Context : " + e);
+ }
+ }
+}
+
+
Added:
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/Publisher.java
===================================================================
---
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/Publisher.java
(rev 0)
+++
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/Publisher.java 2008-09-02
14:31:35 UTC (rev 2369)
@@ -0,0 +1,108 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb.utils;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQTopic;
+import org.apache.qpid.client.BasicMessageProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.Session;
+import javax.jms.TopicSubscriber;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.MessageListener;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import java.util.Properties;
+import java.util.List;
+import java.util.LinkedList;
+
+public class Publisher
+{
+ private static final Logger _logger = LoggerFactory.getLogger(Publisher.class);
+
+ JNDIHelper _jndiHelper;
+
+ Session _session;
+
+ MessageProducer _publisher;
+
+ AMQConnection _connection;
+
+ public static void main(String[] args) throws JMSException
+ {
+ new Publisher();
+ }
+
+ public Publisher() throws JMSException
+ {
+ this(JNDIHelper.DEFAULT_BROKER, null);
+ }
+
+ public Publisher(String broker, String topicStr) throws JMSException
+ {
+ _jndiHelper= new JNDIHelper(broker);
+
+ _connection = (AMQConnection) ((ConnectionFactory)
_jndiHelper.lookupJNDI(JNDIHelper.CONNECTION_JNDI_NAME)).createConnection();
+
+ _jndiHelper.close();
+
+ AMQTopic topic = new AMQTopic(_connection, topicStr);
+
+ _logger.debug("Create Session");
+ _session = _connection.createSession(true, Session.SESSION_TRANSACTED);
+ _logger.debug("Create publisher on Session");
+
+ if (topicStr != null)
+ {
+ _publisher = _session.createProducer(_session.createTopic(topicStr));
+ }
+ else
+ {
+ _publisher = _session.createProducer(null);
+ }
+ }
+
+ public void close() throws JMSException
+ {
+ _connection.close();
+ }
+
+ public void commit() throws JMSException
+ {
+ _session.commit();
+ }
+
+ public Message createTextMessage(String msg) throws JMSException
+ {
+ return _session.createTextMessage(msg);
+ }
+
+ public void send(Message msg) throws JMSException
+ {
+ _publisher.send( msg);
+ }
+}
Added:
store/branches/java/broker-queue-refactor/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
===================================================================
---
store/branches/java/broker-queue-refactor/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
(rev 0)
+++
store/branches/java/broker-queue-refactor/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java 2008-09-02
14:31:35 UTC (rev 2369)
@@ -0,0 +1,244 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ *
http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.qpid.server.store.MemoryMessageStore;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.AMQException;
+
+import java.io.File;
+
+import com.sleepycat.je.DatabaseEntry;
+import com.sleepycat.je.DatabaseException;
+import com.sleepycat.je.Database;
+import com.sleepycat.bind.tuple.TupleBinding;
+import com.sleepycat.bind.tuple.ByteBinding;
+
+/**
+ * This is a simple BerkeleyDB Store upgrade tool that will upgrade a V1 Store to a V2
Store.
+ *
+ * NOTE: No checks are in place to validate that the input is V1.
+ *
+ * Currently upgrade is fixed from v1 -> v2
+ * Only the Queue and Binding databases are migrated all other databases are copied as DB
entries.
+ *
+ * Improvments:
+ * - Add List BDBMessageStore.getDatabases(); This can the be iterated to guard against
new DBs being added.
+ * - Add a version value into the store so that a quick check can be performed to perform
the upgrades.
+ * - A version in the store would allow automated upgrade or later with more available
versions interactive upgrade.
+ * - Currently only the Queue and Binding DB are processed for upgrade all the other db
data is copied between stores.
+ * - Add process logging and disable all Store and Qpid logging.
+ */
+public class BDBStoreUpgrade
+{
+ private static final String FROM_DIR = "-input";
+ private static final String TO_DIR = "-output";
+
+ private static final Logger _logger =
LoggerFactory.getLogger(BDBStoreUpgrade.class);
+ /** The Store Directory that needs upgrading */
+ File _fromDir;
+ /** The Directory that will be made to contain the upgraded store */
+ File _toDir;
+
+ /** The Old Store */
+ BDBMessageStore _oldMessageStore;
+ /** The New Store */
+ BDBMessageStore _newMessageStore;
+ /** A VHost used in the migration of the queues from the old VHost */
+ VirtualHost _newVirtualHost;
+ /** A VHost used in the creation of the queues from the old store */
+ VirtualHost _oldVirtualHost;
+
+ public BDBStoreUpgrade(String fromDir, String toDir)
+ {
+ _fromDir = new File(fromDir);
+ if (!_fromDir.exists())
+ {
+ if (!_fromDir.mkdirs())
+ {
+ throw new IllegalArgumentException("BDBStore path " + fromDir +
" could not be read. "
+ + "Ensure the path is correct and
that the permissions are correct.");
+ }
+ }
+
+ _toDir = new File(toDir);
+ if (!_toDir.exists())
+ {
+ if (!_toDir.mkdirs())
+ {
+ throw new IllegalArgumentException("Environment path " + toDir
+ " could not be created. "
+ + "Ensure the path is correct and
that the permissions are correct.");
+ }
+ }
+ else
+ {
+ throw new IllegalArgumentException("Environment path " + toDir +
" already exists. ");
+
+ }
+
+ }
+
+ /**
+ * Upgrade a Store of a specified version to the latest version.
+ * @param version the version of the current store
+ * @throws Exception
+ */
+ public void upgradeFromVersion(int version) throws Exception
+ {
+ upgradeFromVersion(version, _fromDir, _toDir);
+ }
+
+ /**
+ * Upgrade a Store of a specified version to the latest version.
+ *
+ * @param version the version of the current store
+ * @param fromDir the directory with the old Store
+ * @param toDir the directrory to hold the newly Upgraded Store
+ *
+ * @throws Exception due to Virtualhost/MessageStore.close() being rather poor at
exception handling
+ * @throws DatabaseException if there is a problem with the store formats
+ * @throws AMQException if there is an issue creating Qpid data structures
+ */
+ public void upgradeFromVersion(int version, File fromDir, File toDir) throws
Exception
+ {
+
+ // Note the name of the Vhosts is not important, the store doesnot record the
name of the vhost.
+ _newVirtualHost = new VirtualHost("Upgraded", new
MemoryMessageStore());
+ _oldVirtualHost = new VirtualHost("Old", new MemoryMessageStore());
+
+ //Create a new messageStore
+ _newMessageStore = new BDBMessageStore();
+ _newMessageStore.configure(_newVirtualHost, toDir);
+
+ //Load the old MessageStore
+ switch (version)
+ {
+ default:
+ case 1:
+ _oldMessageStore = new BDBMessageStore(1);
+ _oldMessageStore.configure(_oldVirtualHost, fromDir);
+ upgradeFromVersion_1();
+ break;
+ }
+
+ _oldMessageStore.close();
+ _newMessageStore.close();
+
+ _newVirtualHost.close();
+ _oldVirtualHost.close();
+
+ //Shutdown the AR that the Vhosts will have created.
+ ApplicationRegistry.remove(1);
+ }
+
+ private void upgradeFromVersion_1() throws AMQException, DatabaseException
+ {
+ //Migrate _messageMetaDataDb;
+ moveContents(_oldMessageStore.getMetaDataDb(),
_newMessageStore.getMetaDataDb());
+
+ //Migrate _messageContentDb;
+ moveContents(_oldMessageStore.getContentDb(), _newMessageStore.getContentDb());
+
+ //Migrate _queueDb;
+ //Get the oldMessageStore Tuple Binding which does the parsing
+ final TupleBinding queueTupleBinding =
_oldMessageStore.getQueueTupleBindingFactory().getInstance();
+ _oldMessageStore.visitQueues(new BDBMessageStore.DatabaseVisitor()
+ {
+ public void visit(DatabaseEntry key, DatabaseEntry value) throws
AMQException
+ {
+ AMQQueue queue = (AMQQueue) queueTupleBinding.entryToObject(value);
+
+ _newMessageStore.createQueue(queue);
+ //
+ _newMessageStore.getQueueRegistry().registerQueue(queue);
+ //or
+ //queue.stop();
+ // As the queue will create a thread pool message delivery that needs to
be shutdown
+ // for the broker to cleanly stop.
+ }
+ });
+
+ //Migrate _deliveryDb;
+ moveContents(_oldMessageStore.getDeliveryDb(),
_newMessageStore.getDeliveryDb());
+
+ //Migrate _exchangeDb;
+ moveContents(_oldMessageStore.getExchangesDb(),
_newMessageStore.getExchangesDb());
+
+ //Migrate _queueBindingsDb;
+ final TupleBinding bindingTupleBinding =
_oldMessageStore.getBindingTupleBindingFactory().getInstance();
+ _oldMessageStore.visitBindings(new BDBMessageStore.DatabaseVisitor()
+ {
+ public void visit(DatabaseEntry key, DatabaseEntry value) throws
AMQException, DatabaseException
+ {
+ BindingKey queueBinding = (BindingKey)
bindingTupleBinding.entryToObject(key);
+
+ TupleBinding newBindingTupleBinding =
_newMessageStore.getBindingTupleBindingFactory().getInstance();
+
+ DatabaseEntry newKey = new DatabaseEntry();
+ newBindingTupleBinding.objectToEntry(queueBinding, newKey);
+
+ ByteBinding.byteToEntry((byte) 0, value);
+ _newMessageStore.getBindingsDb().put(null, newKey, value);
+ }
+ });
+
+ }
+
+ private void moveContents(Database oldDatabase, final Database newDatabase) throws
AMQException, DatabaseException
+ {
+ _oldMessageStore.visitDatabase(oldDatabase, new
BDBMessageStore.DatabaseVisitor()
+ {
+ public void visit(DatabaseEntry key, DatabaseEntry value) throws
AMQException, DatabaseException
+ {
+ newDatabase.put(null, key, value);
+ }
+ });
+
+ }
+
+ private static void usage()
+ {
+ System.out.println("usage: -fromDir <path-to-store-to-upgrade> -toDir
<path-to-location-to-write-upgraded-db>");
+ }
+
+ public static void main(String[] args) throws Exception
+ {
+ String fromDir, toDir;
+
+ if (args.length < 4 ||
+ (!args[0].equals(FROM_DIR) && !args[2].equals(TO_DIR)))
+ {
+ usage();
+ System.exit(1);
+ }
+
+ fromDir = args[1];
+ toDir = args[3];
+
+ new BDBStoreUpgrade(fromDir, toDir).upgradeFromVersion(1);
+
+ System.out.println("Upgrade complete");
+ }
+}