Author: rgemmell
Date: 2010-08-06 10:28:17 -0400 (Fri, 06 Aug 2010)
New Revision: 4190
Removed:
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageReSendTest.java
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreTest.java
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/QueueDeleteWhilstRoutingTest.java
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StoreContextRaceConditionTest.java
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/BDBVMTestCase.java
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/DurableSubscriber.java
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/JNDIHelper.java
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/Publisher.java
Log:
Remove old broken tests which are being replaced
Deleted:
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java
===================================================================
---
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java 2010-08-06
14:10:10 UTC (rev 4189)
+++
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java 2010-08-06
14:28:17 UTC (rev 4190)
@@ -1,545 +0,0 @@
-/*
- *
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *
http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-/*
-import com.sleepycat.je.DatabaseException;
-import junit.framework.Assert;
-import junit.framework.TestCase;
-import junit.framework.TestSuite;
-import org.apache.log4j.Logger;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.ContentChunk;
-import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.configuration.ServerConfiguration;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.exchange.DefaultExchangeFactory;
-import org.apache.qpid.server.exchange.DirectExchange;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.queue.MessageMetaData;
-import org.apache.qpid.server.queue.AMQQueueFactory;
-import org.apache.qpid.server.queue.AMQPriorityQueue;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.txn.NonTransactionalContext;
-import org.apache.qpid.server.txn.TransactionalContext;
-import org.apache.qpid.server.util.NullApplicationRegistry;
-import org.apache.commons.configuration.PropertiesConfiguration;
-
-import java.io.File;
-import java.util.LinkedList;
-import java.util.List;
-*/
-
-import org.apache.qpid.server.store.berkeleydb.utils.BDBVMTestCase;
-
-public class BDBStoreTest extends BDBVMTestCase
-{
- /*
- private static final Logger _log = Logger.getLogger(BDBStoreTest.class);
-
- private BDBMessageStore _store;
- private String STORE_LOCATION = System.getProperty("BDB_WORK") +
"/bdbTestEnv";
-
- private StoreContext _storeContext = new StoreContext();
- private VirtualHost _virtualHost;
-
- private TransactionalContext _txnContext;
- private static final AMQShortString QUEUE1 = new AMQShortString("queue1");
- private static final AMQShortString ME = new AMQShortString("me");
- private static final AMQShortString MYEXCHANGE = new
AMQShortString("myexchange");
- private static final AMQShortString RK = new AMQShortString("rk");
- private static final AMQShortString QUEUE2 = new AMQShortString("queue2");
- private static final AMQShortString HIM = new AMQShortString("him");
- private static final AMQShortString EXCHANGE1 = new
AMQShortString("exchange1");
-
- private static volatile int _loops;
- File BDB_DIR = new File(STORE_LOCATION);
-
- public void setUp() throws Exception
- {
- if (BDB_DIR.exists())
- {
- deleteDirectory(BDB_DIR);
- }
-
- ApplicationRegistry.initialise(new NullApplicationRegistry());
-
- File bdbDir = new File(STORE_LOCATION);
- deleteDirectory(bdbDir);
- BDB_DIR.mkdirs();
-
- _store = new BDBMessageStore();
- _store.configure(BDB_DIR);
-
- PropertiesConfiguration config = new PropertiesConfiguration();
-
- // This is used to test that recovery will correctly reapply configuration to the
queues
- // in testRecovery
- config.setProperty("queues.maximumMessageAge", "23");
-
- VirtualHostConfiguration vhostConfig = new
VirtualHostConfiguration("test", config);
-
- _virtualHost = new VirtualHost(vhostConfig, _store);
-
- _store.setVirtualHost(_virtualHost);
-
- _txnContext = new NonTransactionalContext(_store, _storeContext, null, new
LinkedList<RequiredDeliveryException>());
- }
-
- private void reload() throws Exception
- {
- _virtualHost.close();
-
- PropertiesConfiguration env = new PropertiesConfiguration();
-
- env.addProperty("store.environment-path", STORE_LOCATION);
- env.addProperty("store.class",
"org.apache.qpid.server.store.berkeleydb.BDBMessageStore");
-
- _virtualHost = new VirtualHost(new VirtualHostConfiguration("test",
env), null);
-
- _store = (BDBMessageStore) _virtualHost.getMessageStore();
- env.addProperty("store.class",
"org.apache.qpid.server.store.berkeleydb.InspectableBDBMessageStore");
- env.setProperty("queues.maximumMessageAge", 23);
- _virtualHost = new VirtualHost(new VirtualHostConfiguration("test",
env));
- }
-
- public void tearDown() throws Exception
- {
- _virtualHost.close();
-
- ApplicationRegistry.remove(1);
- }
-
- public void testExchangePersistence() throws Exception
- {
- FieldTable queueArguments = new FieldTable();
- Integer priorityLevel = 5;
- queueArguments.put(AMQQueueFactory.X_QPID_PRIORITIES, priorityLevel);
-
- Exchange exchange = new
DefaultExchangeFactory(_virtualHost).createExchange(EXCHANGE1,
DirectExchange.TYPE.getName(), true, false, 0);
-
- assertNotNull("Exchange is null", exchange);
- assertEquals("Exchange Name incorrect", EXCHANGE1,
exchange.getName());
- assertTrue("Exchange is not durable", exchange.isDurable());
-
- _virtualHost.getExchangeRegistry().registerExchange(exchange);
- _store.createExchange(exchange);
-
- //Ensure it is registered correctly
- exchange = _virtualHost.getExchangeRegistry().getExchange(EXCHANGE1);
- assertNotNull("Exchange is null", exchange);
-
- reload();
-
- exchange = _virtualHost.getExchangeRegistry().getExchange(EXCHANGE1);
-
- assertNotNull("Exchange is null", exchange);
- assertEquals("Exchange Name incorrect", EXCHANGE1,
exchange.getName());
- assertTrue("Exchange is not durable", exchange.isDurable());
-
- }
-
- public void testQueuePersistence() throws Exception
- {
-
- FieldTable queueArguments = new FieldTable();
- Integer priorityLevel = 5;
- queueArguments.put(AMQQueueFactory.X_QPID_PRIORITIES, priorityLevel);
-
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false,
_virtualHost, queueArguments);
-
- _store.createQueue(queue, queueArguments);
-
- AMQShortString routingKey = new AMQShortString("Test-Key");
- FieldTable bindArguments = new FieldTable();
- bindArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), "Test =
'MST'");
-
- _store.bindQueue(_virtualHost.getExchangeRegistry().getDefaultExchange(),
routingKey, queue, bindArguments);
-
- reload();
-
- AMQQueue returnedQueue = _virtualHost.getQueueRegistry().getQueue(QUEUE1);
-
- assertEquals("Queue Name has changed", QUEUE1,
returnedQueue.getName());
- assertEquals("Queue Owner has changed", ME, returnedQueue.getOwner());
- assertTrue("Returned Queue is not Durable",
returnedQueue.isDurable());
- assertEquals("Returned Queue is not A Priority Queue",
AMQPriorityQueue.class, returnedQueue.getClass());
- assertEquals("Returned Queue does not have the right number of
priorities", priorityLevel.intValue(),
- ((AMQPriorityQueue) returnedQueue).getPriorities());
- assertNotNull("Queue has no exchange binding arguments.",
returnedQueue.getExchangeBindings());
- assertEquals("Incorrect binding count for queue.", 1,
returnedQueue.getExchangeBindings().size());
- assertTrue("Binding does not contain a Selector argument.",
-
returnedQueue.getExchangeBindings().get(0).getArguments().containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue()));
- }
-
- private MessagePublishInfo createPublishBody()
- {
-
- return new MessagePublishInfo()
- {
-
- public AMQShortString getExchange()
- {
- return MYEXCHANGE;
- }
-
- public void setExchange(AMQShortString exchange)
- {
-
- }
-
- public boolean isImmediate()
- {
- return false;
- }
-
- public boolean isMandatory()
- {
- return true;
- }
-
- public AMQShortString getRoutingKey()
- {
- return RK;
- }
- };
-
- }
-
- private BasicContentHeaderProperties createContentHeaderProperties()
- {
- BasicContentHeaderProperties props = new BasicContentHeaderProperties();
- props.setContentType("text/html");
- props.setMessageId("abc123");
- return props;
- }
-
- private ContentChunk createContentChunk(String bodyText)
- {
- byte[] bodyBytes = bodyText.getBytes();
- final int size = bodyBytes.length;
- final ByteBuffer payload = ByteBuffer.wrap(bodyBytes);
-
- return new ContentChunk()
- {
-
- public int getSize()
- {
- return size;
- }
-
- public ByteBuffer getData()
- {
- return payload;
- }
-
- public void reduceToFit()
- {
- }
- };
-
- }
-
- private ContentHeaderBody createContentHeaderBody(BasicContentHeaderProperties props,
int length)
- {
- MethodRegistry methodRegistry =
MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
- int classForBasic = methodRegistry.createBasicQosOkBody().getClazz();
- return new ContentHeaderBody(classForBasic, 1, props, length);
- }
-
- public void testMessagePersistence() throws DatabaseException, AMQException
- {
- MessagePublishInfo pubBody = createPublishBody();
- BasicContentHeaderProperties props = createContentHeaderProperties();
- String bodyText =
"jfhdjsflsdhfjdshfjdslhfjdslhfsjlhfsjkhfdsjkhfdsjkfhdslkjf";
- ContentChunk body = createContentChunk(bodyText);
-
- ContentHeaderBody chb = createContentHeaderBody(props, body.getSize());
-
- _store.storeMessageMetaData(_storeContext, 14L, new MessageMetaData(pubBody, chb,
1));
- _store.storeContentBodyChunk(_storeContext, 14L, 0, body, true);
-
- MessageMetaData mmd = _store.getMessageMetaData(_storeContext, 14L);
- MessagePublishInfo returnedPubBody = mmd.getMessagePublishInfo();
- Assert.assertEquals("Message exchange has changed",
pubBody.getExchange(), returnedPubBody.getExchange());
- Assert.assertEquals("Immediate flag has changed",
pubBody.isImmediate(), returnedPubBody.isImmediate());
- Assert.assertEquals("Mandatory flag has changed",
pubBody.isMandatory(), returnedPubBody.isMandatory());
- Assert.assertEquals("Routing key has changed", pubBody.getRoutingKey(),
returnedPubBody.getRoutingKey());
-
- ContentHeaderBody returnedHeaderBody = mmd.getContentHeaderBody();
- Assert.assertEquals("ContentHeader ClassID has changed", chb.classId,
returnedHeaderBody.classId);
- Assert.assertEquals("ContentHeader weight has changed", chb.weight,
returnedHeaderBody.weight);
- Assert.assertEquals("ContentHeader bodySize has changed", chb.bodySize,
returnedHeaderBody.bodySize);
- BasicContentHeaderProperties returnedProperties = (BasicContentHeaderProperties)
returnedHeaderBody.properties;
- Assert.assertEquals("Property ContentType has changed",
props.getContentTypeAsString(), returnedProperties.getContentTypeAsString());
- Assert.assertEquals("Property MessageID has changed",
props.getMessageIdAsString(), returnedProperties.getMessageIdAsString());
- Assert.assertEquals("MessageMD ChunkCount has changed",
mmd.getContentChunkCount(), 1);
- ContentChunk returnedContentBody = _store.getContentBodyChunk(_storeContext, 14L,
0);
- ByteBuffer returnedPayloadAsBytes = returnedContentBody.getData();
- byte[] returnedPayload = new byte[returnedPayloadAsBytes.remaining()];
- returnedPayloadAsBytes.get(returnedPayload);
- String returnedPayloadString = new String(returnedPayload);
- Assert.assertEquals("Message Payload has changed", bodyText,
returnedPayloadString);
- }
-
- public void testMessageCreateAndDelete() throws Exception
- {
- MessagePublishInfo pubBody = createPublishBody();
- BasicContentHeaderProperties props = createContentHeaderProperties();
- String bodyText = "dsjfhdsjkfhdsjflhsdjfdshfjdlhfjhdljfdhsljkfsh";
- ContentChunk body = createContentChunk(bodyText);
-
- ContentHeaderBody chb = createContentHeaderBody(props, body.getSize());
- _store.storeMessageMetaData(_storeContext, 15L, new MessageMetaData(pubBody, chb,
1));
- _store.storeContentBodyChunk(_storeContext, 15L, 0, body, true);
- _store.getContentBodyChunk(_storeContext, 15L, 0);
- _store.removeMessage(_storeContext, 15L);
-
- // the next line should throw since the message id should not be found
- try
- {
- _store.getMessageMetaData(_storeContext, 15L);
- Assert.fail("No exception thrown when message id not found getting
metadata");
- }
- catch (AMQException e)
- {
- // pass since exception expected
- }
-
- try
- {
- _store.getContentBodyChunk(_storeContext, 15L, 0);
- Assert.fail("No exception thrown when message id not found getting
content chunk");
- }
- catch (AMQException e)
- {
- // pass since exception expected
- }
-
- }
-
- public void testTranCommit() throws Exception
- {
- MessagePublishInfo pubBody = createPublishBody();
- BasicContentHeaderProperties props = createContentHeaderProperties();
- String bodyText = "dsjfhdsjkfhdsjflhsdjfdshfjdlhfjhdljfdhsljkfsh";
- ContentChunk body = createContentChunk(bodyText);
-
- ContentHeaderBody chb = createContentHeaderBody(props, body.getSize());
- _store.storeMessageMetaData(_storeContext, 20L, new MessageMetaData(pubBody, chb,
0));
- _store.storeMessageMetaData(_storeContext, 21L, new MessageMetaData(pubBody, chb,
0));
- _store.storeMessageMetaData(_storeContext, 22L, new MessageMetaData(pubBody, chb,
0));
-
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false,
_virtualHost, null);
- _store.createQueue(queue);
-
- _store.beginTran(_storeContext);
- _store.enqueueMessage(_storeContext, queue, 20L);
- _store.enqueueMessage(_storeContext, queue, 21L);
- _store.commitTran(_storeContext);
-
- List<Long> enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
- Assert.assertEquals("Enqueued messages have changed", 2,
enqueuedIds.size());
- Long val = enqueuedIds.get(0);
- Assert.assertEquals("First Message is incorrect", 20L,
val.longValue());
- val = enqueuedIds.get(1);
- Assert.assertEquals("Second Message is incorrect", 21L,
val.longValue());
-
- }
-
- public void testTranRollback1() throws Exception
- {
- List<Long> enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
- assertTrue("Last Test Messages are still present",
enqueuedIds.isEmpty());
-
- MessagePublishInfo pubBody = createPublishBody();
- BasicContentHeaderProperties props = createContentHeaderProperties();
- String bodyText = "dsjfhdsjkfhdsjflhsdjfdshfjdlhfjhdljfdhsljkfsh";
- ContentChunk body = createContentChunk(bodyText);
-
- ContentHeaderBody chb = createContentHeaderBody(props, body.getSize());
-
- _store.storeMessageMetaData(_storeContext, 30L, new MessageMetaData(pubBody, chb,
0));
- _store.storeMessageMetaData(_storeContext, 31L, new MessageMetaData(pubBody, chb,
0));
- _store.storeMessageMetaData(_storeContext, 32L, new MessageMetaData(pubBody, chb,
0));
-
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false,
_virtualHost, null);
- _store.createQueue(queue);
-
- _store.beginTran(_storeContext);
- _store.enqueueMessage(_storeContext, queue, 30L);
- _store.enqueueMessage(_storeContext, queue, 31L);
- _store.commitTran(_storeContext);
-
- _store.beginTran(_storeContext);
- _store.enqueueMessage(_storeContext, queue, 32L);
- _store.abortTran(_storeContext);
-
- _store.beginTran(_storeContext);
- _store.commitTran(_storeContext);
-
- enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
- assertTrue("Last Test Message is still present",
!enqueuedIds.contains(20L));
- assertEquals("Incorrect Enqueued Message Count:", 2,
enqueuedIds.size());
- Long val = enqueuedIds.get(0);
- assertEquals("First Message is incorrect", 30L, val.longValue());
- val = enqueuedIds.get(1);
- assertEquals("Second Message is incorrect", 31L, val.longValue());
-
- }
-
- public void testTranRollback2() throws Exception
- {
- List<Long> enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
- assertTrue("Last Test Messages are still present",
enqueuedIds.isEmpty());
-
- MessagePublishInfo pubBody = createPublishBody();
- BasicContentHeaderProperties props = createContentHeaderProperties();
- String bodyText = "dsjfhdsjkfhdsjflhsdjfdshfjdlhfjhdljfdhsljkfsh";
- ContentChunk body = createContentChunk(bodyText);
-
- ContentHeaderBody chb = createContentHeaderBody(props, body.getSize());
-
- _store.storeMessageMetaData(_storeContext, 30L, new MessageMetaData(pubBody, chb,
0));
- _store.storeMessageMetaData(_storeContext, 31L, new MessageMetaData(pubBody, chb,
0));
- _store.storeMessageMetaData(_storeContext, 32L, new MessageMetaData(pubBody, chb,
0));
-
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false,
_virtualHost, null);
- _store.createQueue(queue);
-
- _store.beginTran(_storeContext);
- _store.enqueueMessage(_storeContext, queue, 30L);
- _store.abortTran(_storeContext);
-
- _store.beginTran(_storeContext);
- _store.enqueueMessage(_storeContext, queue, 31L);
- _store.enqueueMessage(_storeContext, queue, 32L);
- _store.commitTran(_storeContext);
-
- enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
- Assert.assertEquals("Incorrect Enqueued Message Count", 2,
enqueuedIds.size());
- Long val = enqueuedIds.get(0);
- Assert.assertEquals("First Message is incorrect", 31L,
val.longValue());
- val = enqueuedIds.get(1);
- Assert.assertEquals("Second Message is incorrect", 32L,
val.longValue());
- }
-
- public void testRecovery() throws Exception
- {
- List<Long> enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
- assertTrue("Last Test Messages are still present",
enqueuedIds.isEmpty());
-
- MessagePublishInfo pubBody = createPublishBody();
- BasicContentHeaderProperties props = createContentHeaderProperties();
- String bodyText = "dsjfhdsjkfhdsjflhsdjfdshfjdlhfjhdljfdhsljkfsh";
- ContentChunk body = createContentChunk(bodyText);
-
- ContentHeaderBody chb = createContentHeaderBody(props, body.getSize());
-
- _store.storeMessageMetaData(_storeContext, 40L, new MessageMetaData(pubBody, chb,
0));
- _store.storeMessageMetaData(_storeContext, 41L, new MessageMetaData(pubBody, chb,
0));
- _store.storeMessageMetaData(_storeContext, 42L, new MessageMetaData(pubBody, chb,
0));
-
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false,
_virtualHost, null);
- AMQQueue queue2 = AMQQueueFactory.createAMQQueueImpl(QUEUE2, true, HIM, false,
_virtualHost, null);
-
- _store.createQueue(queue);
- _store.createQueue(queue2);
-
- _store.beginTran(_storeContext);
- _store.enqueueMessage(_storeContext, queue, 40L);
- _store.enqueueMessage(_storeContext, queue, 41L);
- _store.enqueueMessage(_storeContext, queue2, 42L);
- _store.commitTran(_storeContext);
-
- _store.enqueueMessage(_storeContext, queue, 42L);
-
- reload();
-
- try
- {
- AMQQueue q1 = _virtualHost.getQueueRegistry().getQueue(QUEUE1);
- AMQQueue q2 = _virtualHost.getQueueRegistry().getQueue(QUEUE2);
-
- Assert.assertNotNull("Queue1 is was not recovered", q1);
- Assert.assertEquals("Queue1 has incorrect message count", 3,
q1.getMessageCount());
- Assert.assertNotNull("Queue2 is was not recovered", q2);
- Assert.assertEquals("Queue2 has incorrect message count", 1,
q2.getMessageCount());
-
- // Message age is set in setUp
- assertEquals("q1 has an incorrect maximum message age", 23,
q1.getMaximumMessageAge());
- }
- catch (Exception e)
- {
- e.printStackTrace();
- fail(e.getMessage());
- }
-
- }
-
- public void testDequeue() throws AMQException
- {
- MessagePublishInfo pubBody = createPublishBody();
- BasicContentHeaderProperties props = createContentHeaderProperties();
- String bodyText = "dsjfhdsjkfhdsjflhsdjfdshfjdlhfjhdljfdhsljkfsh";
- ContentChunk body = createContentChunk(bodyText);
-
- ContentHeaderBody chb = createContentHeaderBody(props, body.getSize());
-
- _store.storeMessageMetaData(_storeContext, 50L, new MessageMetaData(pubBody, chb,
0));
-
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false,
_virtualHost, null);
- _store.createQueue(queue);
-
- _store.enqueueMessage(_storeContext, queue, 50L);
- _store.dequeueMessage(_storeContext, queue, 50L);
- }
-
- public void testQueueRemove() throws AMQException
- {
- AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false,
_virtualHost, null);
- _store.createQueue(queue);
- _store.removeQueue(queue);
- try
- {
- _store.removeQueue(queue);
- Assert.fail("No exception thrown when deleting non-existant
queue");
- }
- catch (AMQException e)
- {
- // Pass
- }
- }
-
- public static junit.framework.Test suite()
- {
- return new TestSuite(BDBStoreTest.class);
- }
- */
-
- public void testDummy()
- {
-
- }
-}
Deleted:
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageReSendTest.java
===================================================================
---
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageReSendTest.java 2010-08-06
14:10:10 UTC (rev 4189)
+++
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageReSendTest.java 2010-08-06
14:28:17 UTC (rev 4190)
@@ -1,112 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- *
http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.server.store.berkeleydb;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.Queue;
-import javax.jms.Session;
-
-import org.apache.qpid.server.store.berkeleydb.utils.BDBVMTestCase;
-
-public class MessageReSendTest extends BDBVMTestCase
-{
- protected static final String MESSAGE_ID_PROPERTY = "MessageIDProperty";
-
- public void test() throws Exception
- {
-
- //Send Message
- sendMessages(getConnection(), 1);
- System.err.println("SEND");
-
- //Create Connection
- Connection connection = getConnection();
- System.err.println("RECEIVE");
-
- //Receive Message
- checkMessagesOnQueue(connection, _queue, 1);
- //Close connections
- connection.close();
- System.err.println("VALIDATE");
-
- //Reconnect and ensure message is gone
- connection = getConnection();
- checkMessagesOnQueue(connection, _queue, 0);
- connection.close();
-
- try
- {
- //restart broker
- stopBroker(1);
- System.err.println("START");
- startBroker(1);
- }
- catch (Exception e)
- {
- fail(e.getMessage());
- }
-
- //reconnect and ensure message is gone
- connection = getConnection();
- checkMessagesOnQueue(connection, _queue, 0);
- connection.close();
- }
-
- private void checkMessagesOnQueue(Connection connection, Queue queue, int count)
- {
- try
- {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createConsumer(queue);
-
- connection.start();
-
- Message msg = consumer.receive(1000);
-
- if (count > 0)
- {
- int received = 1;
- while (received < count)
- {
- assertNotNull(msg);
- assertEquals(received, msg.getIntProperty(MESSAGE_ID_PROPERTY));
-
- //get next message
- msg = consumer.receive(1000);
- }
-
- }
- else
- {
- assertNull("Received Message when none expected", msg);
- }
- }
- catch (JMSException e)
- {
- fail(e.getMessage());
- }
- }
-
-}
Deleted:
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreTest.java
===================================================================
---
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreTest.java 2010-08-06
14:10:10 UTC (rev 4189)
+++
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageStoreTest.java 2010-08-06
14:28:17 UTC (rev 4190)
@@ -1,39 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- *
http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import org.apache.commons.configuration.PropertiesConfiguration;
-
-public class MessageStoreTest extends org.apache.qpid.server.store.MessageStoreTest
-{
-
- public void testBDBMessageStore() throws Exception
- {
- PropertiesConfiguration config = new PropertiesConfiguration();
-
- config.addProperty("store.environment-path",
- System.getProperty("QPID_WORK") +
"/BDB_MessageStoreTest");
- config.addProperty("store.class",
"org.apache.qpid.server.store.berkeleydb.BDBMessageStore");
-
- runTestWithStore(config);
- }
-
-}
Deleted:
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/QueueDeleteWhilstRoutingTest.java
===================================================================
---
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/QueueDeleteWhilstRoutingTest.java 2010-08-06
14:10:10 UTC (rev 4189)
+++
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/QueueDeleteWhilstRoutingTest.java 2010-08-06
14:28:17 UTC (rev 4190)
@@ -1,204 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- *
http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.store.berkeleydb.utils.BDBVMTestCase;
-
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.naming.NamingException;
-import java.io.File;
-
-public class QueueDeleteWhilstRoutingTest extends BDBVMTestCase
-{
- private static final Logger _logger =
Logger.getLogger(QueueDeleteWhilstRoutingTest.class);
-
- MessageConsumer _consumer1, _consumer2;
- Session _clientSession1;
- Connection _producerConnection, _clientConnection1;
-
- int brokerID = 2;
-
- /**
- * Issue analysis:
- * When an Exclusive NonDurable queue is created a queueDelete task is added to the
sessionCloseTaskList
- * When the last consumer on an autodelete queue closes queueDelete is called.
- *
- * Hence the queue is delted twice. Which would hurt the ref counting of all messages
in the consumers
- * unacked map
- *
- * Test Plan:
- *
- * Create two subscribers same topic
- *
- * Send two messages
- *
- * consume one from each consumer to validate that all is good
- *
- * Shutdown persistent broker
- *
- * restart.
- *
- * Expecting failure in broker startup.
- * @throws Exception
- */
- public void test() throws Exception
- {
- _logger.debug("Performing receives");
-
- Message msg1 = _consumer1.receive(1000);
-
- assertNotNull(msg1);
-
- //Check message recevied ok
- assertEquals("Message 1 not received on consumer 1", "Message:
1", ((TextMessage) msg1).getText());
-
- _consumer1.close();
-
- _clientConnection1.close();
-
- _producerConnection.close();
-
- try
- {
- _logger.debug("Shutdown broker in 1 second");
- Thread.sleep(4000);
- }
- catch (InterruptedException e)
- {
- fail(e.getMessage());
- }
-
- //Stop the broker
- stopBroker(brokerID);
-
- try
- {
- _logger.debug("Restart broker in 2 second");
- Thread.sleep(4000);
- }
- catch (InterruptedException e)
- {
- fail(e.getMessage());
- }
-
- //Start the broker
- try
- {
- //FIXME startVMBroker(brokerID, _persistentConfigFile);
- }
- catch (Exception e)
- {
- fail(e.getMessage());
- }
-
- //Test Connection
- _clientConnection1 = getConnection();
-
- _clientConnection1.close();
- }
-
- public void setUp() throws Exception
- {
- super.setUp();
-
- //FIXME startVMBroker(brokerID, _persistentConfigFile);
-
- // Initialise ACLs.
-
- //Create Consumers
- //Create consumer on the temp queue
- Queue requestQueue = (Queue) getInitialContext().lookup("queue");
-
- _clientConnection1 = getConnection();
- _clientSession1 = _clientConnection1.createSession(false,
Session.AUTO_ACKNOWLEDGE);
-
- _queue = _clientSession1.createTemporaryQueue();
-
- _consumer1 = _clientSession1.createConsumer(_queue);
-
- //Start the connection
- _clientConnection1.start();
-
- //Create Producer
- _producerConnection = getConnection();
- final Session producerSession = _producerConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
-
- //Create a listener for the messages
- producerSession.createConsumer(requestQueue).setMessageListener(new
MessageListener()
- {
- public void onMessage(final Message message)
- {
- try
- {
- Destination responseQueue = message.getJMSReplyTo();
-
- //Send a response to the message
- producerSession.createProducer(responseQueue)
- .send(producerSession.createTextMessage(((TextMessage)
message).getText()));
- }
- catch (JMSException e)
- {
- fail(e.getMessage());
- }
- }
- });
- //Start the connection
- _producerConnection.start();
-
- //Send two messages
-
- MessageProducer _clientProducer = _clientSession1.createProducer(requestQueue);
- Message msg = _clientSession1.createTextMessage("Message: 1");
- msg.setJMSReplyTo(_queue);
- _clientProducer.send(msg);
-
- msg = _clientSession1.createTextMessage("Message: 2");
- msg.setJMSReplyTo(_queue);
- _clientProducer.send(msg);
- }
-
- public void tearDown() throws Exception
- {
- //Stop the broker
- try
- {
- stopBroker(brokerID);
- }
- catch (Exception e)
- {
- fail(e.getMessage());
- }
-
- super.tearDown();
- }
-
-}
Deleted:
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StoreContextRaceConditionTest.java
===================================================================
---
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StoreContextRaceConditionTest.java 2010-08-06
14:10:10 UTC (rev 4189)
+++
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/StoreContextRaceConditionTest.java 2010-08-06
14:28:17 UTC (rev 4190)
@@ -1,161 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- *
http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.server.store.berkeleydb;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
-import javax.naming.NamingException;
-
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.store.berkeleydb.utils.BDBVMTestCase;
-
-public class StoreContextRaceConditionTest extends BDBVMTestCase
-{
- private static final Logger _logger =
Logger.getLogger(StoreContextRaceConditionTest.class);
-
- public void test() throws InterruptedException, NamingException, JMSException
- {
- Runnable test = new Runnable()
- {
- public void run()
- {
-
- //Create Consumer
- Connection connection = null;
-
- Session session = null;
- try
- {
- try
- {
- connection = getConnection();
- }
- catch (Exception e)
- {
- fail("Unable to obtain connection.");
- }
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- }
- catch (JMSException e)
- {
- return;
- }
-
- try
- {
- int run = 0;
- while (run < 1)
- {
- try
- {
- //Stop the connection to prevent flow
- connection.stop();
- //Create Consumer to receive msgs
- MessageConsumer consumer = session.createConsumer(_queue);
-
- //Send one message to hold up the Async Delivery from
purging
- _logger.info("***** CREATED Consumer");
- _TimeToLive = 0L;
- sendMessages(1);
- _logger.info("***** SENT msg 1");
- //Send 1000 msgs that will time out
- _TimeToLive = 1000L;
- sendMessages(50);
- _logger.info("***** SENT TTL msgs");
-
- //Timeout Messages - Note that we
- Thread.sleep(1000);
- _logger.info("***** SLEEP");
-
- //Allw the messages to flow to us
- connection.start();
- _logger.info("***** START Consumer");
- //*** Starts Async process
-
- //Remove the first message so that the async will occcur and
start purging.
- consumer.receive(1000);
- _logger.info("***** RECEIVE Consumer");
-
- sendMessages(50);
- _logger.info("***** SENT TTL msgs");
-
- //Close the consumer freeing the QHK thread to doing work
- consumer.close();
- _logger.info("***** CLOSE Consumer");
- //** Allows QueueHouskeeping to run.
- sendMessages(50);
- _logger.info("***** SENT TTL msgs");
-
- run++;
- }
- catch (JMSException e)
- {
-
- }
- catch (InterruptedException e)
- {
- }
- }
- }
- finally
- {
- try
- {
- connection.close();
- }
- catch (JMSException e)
- {
- e.printStackTrace(); //To change body of catch statement use
File | Settings | File Templates.
- }
- _logger.info("***** Test Done");
- }
- }
- };
-
- int MAX_THREADS = 1;
-
- Thread[] threads = new Thread[MAX_THREADS];
-
- for (int concurentClients = 0; concurentClients < MAX_THREADS;
concurentClients++)
- {
- threads[concurentClients] = new Thread(test);
- threads[concurentClients].start();
- }
-
- for (int concurentClients = 0; concurentClients < MAX_THREADS;
concurentClients++)
- {
- threads[concurentClients].join();
- }
- }
-
- public static void main(String[] args) throws Exception, InterruptedException
- {
- StoreContextRaceConditionTest scrc = new StoreContextRaceConditionTest();
-
- scrc.setUp();
- scrc.test();
-// scrc.tearDown();
- }
-
-}
Deleted:
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/BDBVMTestCase.java
===================================================================
---
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/BDBVMTestCase.java 2010-08-06
14:10:10 UTC (rev 4189)
+++
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/BDBVMTestCase.java 2010-08-06
14:28:17 UTC (rev 4190)
@@ -1,168 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- *
http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.utils;
-
-import java.io.File;
-
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.MessageProducer;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.log4j.Level;
-import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
-import org.apache.qpid.test.utils.QpidBrokerTestCase;
-
-public class BDBVMTestCase extends QpidBrokerTestCase
-{
- public static final String BDB_WORK = "BDB_WORK";
- public static final String QPID_WORK = "QPID_WORK";
-
- protected String testWork = null;
-
- protected String BDB_WORK_PRE_TEST;
- protected String QPID_WORK_PRE_TEST;
-
- protected final String QpidHome = System.getProperty("QPID_HOME");
- protected final File _persistentConfigFile = new File(QpidHome,
"etc/persistent_config.xml");
- protected Queue _queue;
-
- protected long _TimeToLive = 0L;
- public static final String MESSAGE_ID_PROPERTY = "MessageIDProperty";
-
- public void setUp() throws Exception
- {
- setupWorkDirectory();
-
- //Create the Broker
- super.setUp();
-
- _queue = new AMQQueue("amq.direct", "BDBTestQ");
- }
-
- public void tearDown() throws Exception
- {
- super.tearDown();
-
- if (testWork != null)
- {
- // Clean up the BDB store
- deleteDirectory(new File(testWork));
- testWork = null;
- }
-
- //Reset BDB_WORK
- if (BDB_WORK_PRE_TEST == null)
- {
- System.clearProperty(BDB_WORK);
- }
- else
- {
- System.setProperty(BDB_WORK, BDB_WORK_PRE_TEST);
- }
-
- //Reset QPID_WORK
- if (QPID_WORK_PRE_TEST == null)
- {
- System.clearProperty(QPID_WORK);
- }
- else
- {
- System.setProperty(QPID_WORK, QPID_WORK_PRE_TEST);
- }
- }
-
- public void setupWorkDirectory()
- {
- if (System.getProperty(BDB_WORK) == null)
- {
- fail("BDB_WORK required for BDB tests");
- }
-
- BDB_WORK_PRE_TEST = System.getProperty(BDB_WORK);
- QPID_WORK_PRE_TEST = System.getProperty(QPID_WORK);
-
- //IF BDB_WORK is set but not QPID_WORK then set QPID_WORK to BDB_WORK
- if (QPID_WORK_PRE_TEST == null && BDB_WORK_PRE_TEST != null)
- {
- System.setProperty(QPID_WORK, BDB_WORK_PRE_TEST);
- }
- }
-
- public boolean deleteDirectory(File dir)
- {
- if (dir.isDirectory())
- {
- String[] children = dir.list();
- for (int i = 0; i < children.length; i++)
- {
- if (!deleteDirectory(new File(dir, children[i])))
- {
- return false;
- }
- }
- }
-
- return (dir.delete());
- }
-
- protected void sendMessages(int num) throws JMSException
- {
- Connection producerConnection = null;
- try
- {
- producerConnection = getConnection();
- }
- catch (Exception e)
- {
- fail("Unable to lookup connection in JNDI.");
- }
-
- sendMessages(producerConnection, num);
- }
-
- protected void sendMessages(Connection producerConnection, int num) throws JMSException
- {
- Session producerSession = producerConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
-
- //Ensure _queue is created
- producerSession.createConsumer(_queue).close();
-
- MessageProducer producer = producerSession.createProducer(_queue);
-
- producer.setTimeToLive(_TimeToLive);
- producer.setDisableMessageTimestamp(false);
-
- for (int messsageID = 0; messsageID < num; messsageID++)
- {
- TextMessage textMsg = producerSession.createTextMessage("Message " +
messsageID);
- textMsg.setIntProperty(MESSAGE_ID_PROPERTY, messsageID);
- producer.send(textMsg);
- }
-
- producerConnection.close();
- }
-}
Deleted:
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/DurableSubscriber.java
===================================================================
---
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/DurableSubscriber.java 2010-08-06
14:10:10 UTC (rev 4189)
+++
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/DurableSubscriber.java 2010-08-06
14:28:17 UTC (rev 4190)
@@ -1,112 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- *
http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.utils;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQTopic;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jms.Session;
-import javax.jms.TopicSubscriber;
-import javax.jms.ConnectionFactory;
-import javax.jms.JMSException;
-import javax.jms.MessageListener;
-import javax.jms.Message;
-import java.util.List;
-import java.util.LinkedList;
-
-public class DurableSubscriber implements MessageListener
-{
- private static final Logger _logger =
LoggerFactory.getLogger(DurableSubscriber.class);
-
- JNDIHelper _jndiHelper;
-
- Session _session;
- TopicSubscriber _subscriber;
- AMQConnection _connection;
- private List<Message> _received;
-
- public static void main(String[] args) throws JMSException
- {
- new Publisher();
- }
-
- public DurableSubscriber() throws JMSException
- {
- this(JNDIHelper.DEFAULT_BROKER, null);
- }
-
- public DurableSubscriber(String broker, String topic) throws JMSException
- {
- this(broker, topic, null);
- }
-
- public DurableSubscriber(String broker, String topicStr, String selector) throws
JMSException
- {
- _jndiHelper = new JNDIHelper(broker );
-
- _connection = (AMQConnection) ((ConnectionFactory)
_jndiHelper.lookupJNDI(JNDIHelper.CONNECTION_JNDI_NAME)).createConnection();
-
- _jndiHelper.close();
-
- AMQTopic topic = new AMQTopic(_connection, topicStr);
-
- _logger.debug("Create Session");
- _session = _connection.createSession(true, Session.SESSION_TRANSACTED);
- _logger.debug("Create Durable Subscriber on Session");
-
- if (selector != null)
- {
- _subscriber = _session.createDurableSubscriber(topic,
"MySubscription", selector, false);
- }
- else
- {
- _subscriber = _session.createDurableSubscriber(topic,
"MySubscription");
- }
-
- _received = new LinkedList<Message>();
-
- _subscriber.setMessageListener(this);
-
- _connection.start();
- }
-
- public void close() throws JMSException
- {
- _connection.close();
- }
-
- public void onMessage(Message message)
- {
- _received.add(message);
- }
-
- public List<Message> getMessages()
- {
- return _received;
- }
-
- public void commit() throws JMSException
- {
- _session.commit();
- }
-}
Deleted:
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/JNDIHelper.java
===================================================================
---
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/JNDIHelper.java 2010-08-06
14:10:10 UTC (rev 4189)
+++
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/JNDIHelper.java 2010-08-06
14:28:17 UTC (rev 4190)
@@ -1,107 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- *
http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.utils;
-
-import javax.naming.NamingException;
-import javax.naming.InitialContext;
-import javax.naming.Context;
-import java.util.Properties;
-
-public class JNDIHelper
-{
- public static final String DEFAULT_BROKER = "tcp://localhost:2345";
- public static final String CONNECTION_JNDI_NAME = "local";
-
- public final String INITIAL_CONTEXT_FACTORY =
"org.apache.qpid.jndi.PropertiesFileInitialContextFactory";
- public final String CONNECTION_NAME;
-
- InitialContext _ctx;
-
- public JNDIHelper(String broker)
- {
- CONNECTION_NAME = "amqp://guest:guest@clientid/test?brokerlist='" +
broker + "'";
- setupJNDI();
- }
-
- /**
- * Lookup the specified name in the JNDI Context.
- *
- * @param name The string name of the object to lookup
- *
- * @return The object or null if nothing exists for specified name
- */
- public Object lookupJNDI(String name)
- {
- try
- {
- return _ctx.lookup(name);
- }
- catch (NamingException e)
- {
- System.err.println("Error looking up '" + name + "' in
JNDI Context:" + e);
- }
-
- return null;
- }
-
- /**
- * Setup the JNDI context.
- *
- * In this case we are simply using a Properties object to store the pairing
information.
- *
- * Further details can be found on the wiki site here:
- *
- * @see :
http://cwiki.apache.org/qpid/how-to-use-jndi.html
- */
- private void setupJNDI()
- {
- // Set the properties ...
- Properties properties = new Properties();
- properties.put(Context.INITIAL_CONTEXT_FACTORY, INITIAL_CONTEXT_FACTORY);
- properties.put("connectionfactory." + CONNECTION_JNDI_NAME,
CONNECTION_NAME);
-
- // Create the initial context
- Context ctx = null;
- try
- {
- _ctx = new InitialContext(properties);
- }
- catch (NamingException e)
- {
- System.err.println("Error Setting up JNDI Context:" + e);
- }
- }
-
- /** Close the JNDI Context to keep everything happy. */
- public void close()
- {
- try
- {
- _ctx.close();
- }
- catch (NamingException e)
- {
- System.err.println("Unable to close JNDI Context : " + e);
- }
- }
-}
-
-
Deleted:
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/Publisher.java
===================================================================
---
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/Publisher.java 2010-08-06
14:10:10 UTC (rev 4189)
+++
store/trunk/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/Publisher.java 2010-08-06
14:28:17 UTC (rev 4190)
@@ -1,108 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- *
http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store.berkeleydb.utils;
-
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQTopic;
-import org.apache.qpid.client.BasicMessageProducer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jms.Session;
-import javax.jms.TopicSubscriber;
-import javax.jms.ConnectionFactory;
-import javax.jms.JMSException;
-import javax.jms.MessageListener;
-import javax.jms.Message;
-import javax.jms.MessageProducer;
-import javax.naming.Context;
-import javax.naming.InitialContext;
-import javax.naming.NamingException;
-import java.util.Properties;
-import java.util.List;
-import java.util.LinkedList;
-
-public class Publisher
-{
- private static final Logger _logger = LoggerFactory.getLogger(Publisher.class);
-
- JNDIHelper _jndiHelper;
-
- Session _session;
-
- MessageProducer _publisher;
-
- AMQConnection _connection;
-
- public static void main(String[] args) throws JMSException
- {
- new Publisher();
- }
-
- public Publisher() throws JMSException
- {
- this(JNDIHelper.DEFAULT_BROKER, null);
- }
-
- public Publisher(String broker, String topicStr) throws JMSException
- {
- _jndiHelper= new JNDIHelper(broker);
-
- _connection = (AMQConnection) ((ConnectionFactory)
_jndiHelper.lookupJNDI(JNDIHelper.CONNECTION_JNDI_NAME)).createConnection();
-
- _jndiHelper.close();
-
- AMQTopic topic = new AMQTopic(_connection, topicStr);
-
- _logger.debug("Create Session");
- _session = _connection.createSession(true, Session.SESSION_TRANSACTED);
- _logger.debug("Create publisher on Session");
-
- if (topicStr != null)
- {
- _publisher = _session.createProducer(_session.createTopic(topicStr));
- }
- else
- {
- _publisher = _session.createProducer(null);
- }
- }
-
- public void close() throws JMSException
- {
- _connection.close();
- }
-
- public void commit() throws JMSException
- {
- _session.commit();
- }
-
- public Message createTextMessage(String msg) throws JMSException
- {
- return _session.createTextMessage(msg);
- }
-
- public void send(Message msg) throws JMSException
- {
- _publisher.send( msg);
- }
-}