rhmessaging commits: r3128 - store/trunk/cpp/lib/jrnl.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2009-02-23 15:03:59 -0500 (Mon, 23 Feb 2009)
New Revision: 3128
Modified:
store/trunk/cpp/lib/jrnl/enq_rec.cpp
Log:
Fix for BZ 486952 "qpidd+store loadContent() failed: jexception 0x0a00 data_tok::set_rstate( ) threw JERR_MTOK_ILLEGALSTATE: Attempted to change to illegal state.".
Modified: store/trunk/cpp/lib/jrnl/enq_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_rec.cpp 2009-02-23 19:13:41 UTC (rev 3127)
+++ store/trunk/cpp/lib/jrnl/enq_rec.cpp 2009-02-23 20:03:59 UTC (rev 3128)
@@ -8,7 +8,7 @@
*
* \author Kim van der Riet
*
- * Copyright (c) 2007, 2008 Red Hat, Inc.
+ * Copyright (c) 2007, 2008, 2009 Red Hat, Inc.
*
* This file is part of the Qpid async store library msgstore.so.
*
@@ -261,10 +261,11 @@
std::size_t rd_cnt = 0;
if (rec_offs_dblks) // Continuation of record on new page
{
- const u_int32_t hdr_data_dblks = size_dblks(enq_hdr::size() + _enq_hdr._xidsize +
- _enq_hdr._dsize);
- const u_int32_t hdr_tail_dblks = size_dblks(enq_hdr::size() + _enq_hdr._xidsize +
- _enq_hdr._dsize + rec_tail::size());
+ const u_int32_t hdr_xid_data_size = enq_hdr::size() + _enq_hdr._xidsize +
+ (_enq_hdr.is_external() ? 0 : _enq_hdr._dsize);
+ const u_int32_t hdr_xid_data_tail_size = hdr_xid_data_size + rec_tail::size();
+ const u_int32_t hdr_data_dblks = size_dblks(hdr_xid_data_size);
+ const u_int32_t hdr_tail_dblks = size_dblks(hdr_xid_data_tail_size);
const std::size_t rec_offs = rec_offs_dblks * JRNL_DBLK_SIZE;
const std::size_t offs = rec_offs - enq_hdr::size();
@@ -281,7 +282,7 @@
chk_tail();
rd_cnt += sizeof(_enq_tail);
}
- else if (offs < _enq_hdr._xidsize + _enq_hdr._dsize)
+ else if (offs < _enq_hdr._xidsize + _enq_hdr._dsize && !_enq_hdr.is_external())
{
// some data still outstanding, copy remainder of data and tail
const std::size_t data_offs = offs - _enq_hdr._xidsize;
@@ -313,7 +314,7 @@
std::memcpy((char*)_buff + offs, rptr, rem);
rd_cnt += rem;
}
- else if (offs < _enq_hdr._xidsize + _enq_hdr._dsize)
+ else if (offs < _enq_hdr._xidsize + _enq_hdr._dsize && !_enq_hdr.is_external())
{
// some data still outstanding, copy remainder of data
const std::size_t data_offs = offs - _enq_hdr._xidsize;
@@ -360,11 +361,12 @@
_buff = std::malloc(_enq_hdr._xidsize + (_enq_hdr.is_external() ? 0 : _enq_hdr._dsize));
MALLOC_CHK(_buff, "_buff", "enq_rec", "decode");
- const u_int32_t hdr_xid_dblks = size_dblks(enq_hdr::size() + _enq_hdr._xidsize);
- const u_int32_t hdr_data_dblks = size_dblks(enq_hdr::size() + _enq_hdr._xidsize +
- _enq_hdr._dsize);
- const u_int32_t hdr_tail_dblks = size_dblks(enq_hdr::size() + _enq_hdr._xidsize +
- _enq_hdr._dsize + rec_tail::size());
+ const u_int32_t hdr_xid_size = enq_hdr::size() + _enq_hdr._xidsize;
+ const u_int32_t hdr_xid_data_size = hdr_xid_size + (_enq_hdr.is_external() ? 0 : _enq_hdr._dsize);
+ const u_int32_t hdr_xid_data_tail_size = hdr_xid_data_size + rec_tail::size();
+ const u_int32_t hdr_xid_dblks = size_dblks(hdr_xid_size);
+ const u_int32_t hdr_data_dblks = size_dblks(hdr_xid_data_size);
+ const u_int32_t hdr_tail_dblks = size_dblks(hdr_xid_data_tail_size);
// Check if record (header + data + tail) fits within this page, we can check the
// tail before the expense of copying data to memory
if (hdr_tail_dblks <= max_size_dblks)
16 years
rhmessaging commits: r3127 - mgmt/trunk/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2009-02-23 14:13:41 -0500 (Mon, 23 Feb 2009)
New Revision: 3127
Modified:
mgmt/trunk/cumin/python/cumin/slot.py
Log:
Stop trying to consolidate dup slots, and add slot aging
Modified: mgmt/trunk/cumin/python/cumin/slot.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/slot.py 2009-02-23 15:28:19 UTC (rev 3126)
+++ mgmt/trunk/cumin/python/cumin/slot.py 2009-02-23 19:13:41 UTC (rev 3127)
@@ -11,13 +11,12 @@
strings = StringCatalog(__file__)
log = logging.getLogger("cumin.job")
+# XXX marked for death
class UniqueSlot(CuminTable):
def get_recent_sql_where(self, session):
return """
- (s.qmf_update_time =
- (select max(qmf_update_time)
- from slot as ss
- where s.name = ss.name))"""
+ s.qmf_update_time > now() - interval '60 minutes'
+ """
def do_get_items(self, session, *args):
cursor = super(UniqueSlot, self).do_get_items(session, *args)
16 years
rhmessaging commits: r3126 - mgmt/trunk/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2009-02-23 10:28:19 -0500 (Mon, 23 Feb 2009)
New Revision: 3126
Modified:
mgmt/trunk/cumin/python/cumin/brokerlink.py
Log:
Handle null stats
Modified: mgmt/trunk/cumin/python/cumin/brokerlink.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/brokerlink.py 2009-02-20 13:34:27 UTC (rev 3125)
+++ mgmt/trunk/cumin/python/cumin/brokerlink.py 2009-02-23 15:28:19 UTC (rev 3126)
@@ -278,10 +278,12 @@
return link.statsCurr.lastError and "red" or "green"
def render_peer_state(self, session, peer):
- return peer.statsCurr.state
+ if peer.statsCurr:
+ return peer.statsCurr.state
def render_peer_error(self, session, peer):
- return peer.statsCurr.lastError
+ if peer.statsCurr:
+ return peer.statsCurr.lastError
class PeerView(CuminView):
def __init__(self, app, name):
16 years
rhmessaging commits: r3125 - in store/branches/java/broker-queue-refactor/java/bdbstore: lib and 4 other directories.
by rhmessaging-commits@lists.jboss.org
Author: ritchiem
Date: 2009-02-20 08:34:27 -0500 (Fri, 20 Feb 2009)
New Revision: 3125
Added:
store/branches/java/broker-queue-refactor/java/bdbstore/etc/persistent_config.xml
store/branches/java/broker-queue-refactor/java/bdbstore/etc/virtualhosts.xml
store/branches/java/broker-queue-refactor/java/bdbstore/lib/commons-logging-1.0.4.jar
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/InspectableBDBMessageStore.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessagePersistenceTest.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessagePersistenceTransactionalTest.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessagePersistentNoAckTest.java
Modified:
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageReSendTest.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/QueueDeleteWhilstRoutingTest.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/JNDIHelper.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
Log:
QPID-1632 : Update to perform internal reference counting. Additional testing via MessagePersistent*Tests
Added: store/branches/java/broker-queue-refactor/java/bdbstore/etc/persistent_config.xml
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/etc/persistent_config.xml (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/etc/persistent_config.xml 2009-02-20 13:34:27 UTC (rev 3125)
@@ -0,0 +1,122 @@
+<?xml version="1.0" encoding="ISO-8859-1"?>
+<!--
+ -
+ - Licensed to the Apache Software Foundation (ASF) under one
+ - or more contributor license agreements. See the NOTICE file
+ - distributed with this work for additional information
+ - regarding copyright ownership. The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License. You may obtain a copy of the License at
+ -
+ - http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied. See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -
+
+ This is an example config using the BDBMessageStore available from
+ the Red Hat Messaging project at etp.108.redhat.com and distributed under GPL.
+ -->
+
+<broker>
+ <prefix>${QPID_HOME}</prefix>
+ <work>${QPID_WORK}</work>
+ <conf>${prefix}/etc</conf>
+ <connector>
+ <transport>nio</transport>
+ <port>5672</port>
+ <sslport>8672</sslport>
+ <socketReceiveBuffer>32768</socketReceiveBuffer>
+ <socketSendBuffer>32768</socketSendBuffer>
+ </connector>
+ <management>
+ <enabled>true</enabled>
+ <jmxport>8999</jmxport>
+ <security-enabled>false</security-enabled>
+ <ssl>
+ <enabled>true</enabled>
+ <!-- Update below path to your keystore location, eg ${conf}/qpid.keystore -->
+ <keyStorePath>${prefix}/../test_resources/ssl/keystore.jks</keyStorePath>
+ <keyStorePassword>password</keyStorePassword>
+ </ssl>
+ </management>
+ <advanced>
+ <filterchain enableExecutorPool="true"/>
+ <enablePooledAllocator>false</enablePooledAllocator>
+ <enableDirectBuffers>false</enableDirectBuffers>
+ <framesize>65535</framesize>
+ <compressBufferOnQueue>false</compressBufferOnQueue>
+ </advanced>
+
+ <security>
+ <principal-databases>
+ <principal-database>
+ <name>passwordfile</name>
+ <class>org.apache.qpid.server.security.auth.database.PlainPasswordFilePrincipalDatabase</class>
+ <attributes>
+ <attribute>
+ <name>passwordFile</name>
+ <value>${conf}/passwd</value>
+ </attribute>
+ </attributes>
+ </principal-database>
+ </principal-databases>
+
+ <access>
+ <class>org.apache.qpid.server.security.access.plugins.AllowAll</class>
+ </access>
+ <jmx>
+ <access>${conf}/jmxremote.access</access>
+ <principal-database>passwordfile</principal-database>
+ </jmx>
+ </security>
+
+ <virtualhosts>
+ <virtualhost>
+ <name>localhost</name>
+ <localhost>
+ <store>
+ <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class>
+ <environment-path>${work}/bdbstore/localhost-store</environment-path>
+ </store>
+ </localhost>
+ </virtualhost>
+
+ <virtualhost>
+ <name>development</name>
+ <development>
+ <store>
+ <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class>
+ <environment-path>${work}/bdbstore/development-store</environment-path>
+ </store>
+ </development>
+ </virtualhost>
+
+ <virtualhost>
+ <name>test</name>
+ <test>
+ <store>
+ <class>org.apache.qpid.server.store.berkeleydb.BDBMessageStore</class>
+ <environment-path>${work}/bdbstore/test-store</environment-path>
+ </store>
+ </test>
+ </virtualhost>
+
+ </virtualhosts>
+ <heartbeat>
+ <delay>0</delay>
+ <timeoutFactor>2.0</timeoutFactor>
+ </heartbeat>
+ <queue>
+ <auto_register>true</auto_register>
+ </queue>
+
+ <virtualhosts>${conf}/virtualhosts.xml</virtualhosts>
+</broker>
+
+
Added: store/branches/java/broker-queue-refactor/java/bdbstore/etc/virtualhosts.xml
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/etc/virtualhosts.xml (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/etc/virtualhosts.xml 2009-02-20 13:34:27 UTC (rev 3125)
@@ -0,0 +1,123 @@
+<?xml version="1.0" encoding="ISO-8859-1"?>
+<!--
+ -
+ - Licensed to the Apache Software Foundation (ASF) under one
+ - or more contributor license agreements. See the NOTICE file
+ - distributed with this work for additional information
+ - regarding copyright ownership. The ASF licenses this file
+ - to you under the Apache License, Version 2.0 (the
+ - "License"); you may not use this file except in compliance
+ - with the License. You may obtain a copy of the License at
+ -
+ - http://www.apache.org/licenses/LICENSE-2.0
+ -
+ - Unless required by applicable law or agreed to in writing,
+ - software distributed under the License is distributed on an
+ - "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ - KIND, either express or implied. See the License for the
+ - specific language governing permissions and limitations
+ - under the License.
+ -
+ -->
+<virtualhosts>
+ <default>test</default>
+ <virtualhost>
+ <name>localhost</name>
+ <localhost>
+ <exchanges>
+ <exchange>
+ <type>direct</type>
+ <name>test.direct</name>
+ <durable>true</durable>
+ </exchange>
+ <exchange>
+ <type>topic</type>
+ <name>test.topic</name>
+ </exchange>
+ </exchanges>
+ <queues>
+ <exchange>amq.direct</exchange>
+ <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
+ <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+ <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins -->
+
+ <queue>
+ <name>queue</name>
+ </queue>
+ <queue>
+ <name>ping</name>
+ </queue>
+ <queue>
+ <name>test-queue</name>
+ <test-queue>
+ <exchange>test.direct</exchange>
+ <durable>true</durable>
+ </test-queue>
+ </queue>
+ <queue>
+ <name>test-ping</name>
+ <test-ping>
+ <exchange>test.direct</exchange>
+ </test-ping>
+ </queue>
+
+ </queues>
+ </localhost>
+ </virtualhost>
+
+
+ <virtualhost>
+ <name>development</name>
+ <development>
+ <queues>
+ <minimumAlertRepeatGap>30000</minimumAlertRepeatGap>
+ <maximumMessageCount>5000</maximumMessageCount>
+ <queue>
+ <name>queue</name>
+ <queue>
+ <exchange>amq.direct</exchange>
+ <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
+ <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+ <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins -->
+ </queue>
+ </queue>
+ <queue>
+ <name>ping</name>
+ <ping>
+ <exchange>amq.direct</exchange>
+ <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
+ <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+ <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins -->
+ </ping>
+ </queue>
+ </queues>
+ </development>
+ </virtualhost>
+ <virtualhost>
+ <name>test</name>
+ <test>
+ <queues>
+ <minimumAlertRepeatGap>30000</minimumAlertRepeatGap>
+ <maximumMessageCount>5000</maximumMessageCount>
+ <queue>
+ <name>queue</name>
+ <queue>
+ <exchange>amq.direct</exchange>
+ <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
+ <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+ <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins -->
+ </queue>
+ </queue>
+ <queue>
+ <name>ping</name>
+ <ping>
+ <exchange>amq.direct</exchange>
+ <maximumQueueDepth>4235264</maximumQueueDepth> <!-- 4Mb -->
+ <maximumMessageSize>2117632</maximumMessageSize> <!-- 2Mb -->
+ <maximumMessageAge>600000</maximumMessageAge> <!-- 10 mins -->
+ </ping>
+ </queue>
+ </queues>
+ </test>
+ </virtualhost>
+</virtualhosts>
Added: store/branches/java/broker-queue-refactor/java/bdbstore/lib/commons-logging-1.0.4.jar
===================================================================
(Binary files differ)
Property changes on: store/branches/java/broker-queue-refactor/java/bdbstore/lib/commons-logging-1.0.4.jar
___________________________________________________________________
Name: svn:mime-type
+ application/octet-stream
Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2009-02-19 19:02:07 UTC (rev 3124)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2009-02-20 13:34:27 UTC (rev 3125)
@@ -49,15 +49,16 @@
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.queue.PersistentAMQMessage;
import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.routing.RoutingTable;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.store.berkeleydb.tuples.BindingTupleBindingFactory;
import org.apache.qpid.server.store.berkeleydb.tuples.QueueTuple;
import org.apache.qpid.server.store.berkeleydb.tuples.QueueTupleBindingFactory;
+import org.apache.qpid.server.transactionlog.TransactionLog;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.transactionlog.TransactionLog;
-import org.apache.qpid.server.routing.RoutingTable;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import java.io.File;
import java.util.ArrayList;
@@ -130,8 +131,11 @@
private Map<AMQShortString, Long> _queueNameToIdMap = new ConcurrentHashMap<AMQShortString, Long>();
+ protected Map<Long, List<AMQQueue>> _messageOnQueueMap = new ConcurrentHashMap<Long, List<AMQQueue>>();
+
+ private final Map<Transaction, Map<Long, List<AMQQueue>>> _dequeueTxMap = new HashMap<Transaction, Map<Long, List<AMQQueue>>>();
+
// Factory Classes to create the TupleBinding objects that relfect the version instance of this BDBStore
-
private QueueTupleBindingFactory _queueTupleBindingFactory;
private BindingTupleBindingFactory _bindingTupleBindingFactory;
@@ -188,16 +192,15 @@
* whatever parameters it wants.
*
* @param virtualHost The virtual host using by this store
- * @param base The base element identifier from which all configuration items are relative. For example, if
- * the base element is "store", the all elements used by concrete classes will be "store.foo"
- * etc.
- * @param config The apache commons configuration object.
+ * @param base Not used
+ * @param vHostConfig The configuration for this virtualhost
*
* @throws Exception If any error occurs that means the store is unable to configure itself.
*/
- public void configure(VirtualHost virtualHost, String base, Configuration config) throws Exception
+ public void configure(VirtualHost virtualHost, String base, VirtualHostConfiguration vHostConfig) throws Exception
{
- File environmentPath = new File(config.getString(base + "." + ENVIRONMENT_PATH_PROPERTY, "bdbEnv"));
+ Configuration config = vHostConfig.getStoreConfiguration();
+ File environmentPath = new File(config.getString(ENVIRONMENT_PATH_PROPERTY, "bdbEnv"));
if (!environmentPath.exists())
{
if (!environmentPath.mkdirs())
@@ -207,7 +210,7 @@
}
}
- _version = config.getInt(base + "." + DATABASE_FORMAT_VERSION_PROPERTY, DATABASE_FORMAT_VERSION);
+ _version = config.getInt(DATABASE_FORMAT_VERSION_PROPERTY, DATABASE_FORMAT_VERSION);
configure(virtualHost, environmentPath, false);
}
@@ -477,13 +480,14 @@
/**
* Removes the specified message from the store in the given transactional store context.
- *
+ * Internal method that is package scoped to allow testing.
+ *
* @param context The transactional context to remove the message in.
* @param messageId Identifies the message to remove.
*
* @throws AMQException If the operation fails for any reason.
*/
- public void removeMessage(StoreContext context, Long messageId) throws AMQException
+ void removeMessage(StoreContext context, Long messageId) throws AMQException
{
// _log.debug("public void removeMessage(StoreContext context = " + context + ", Long messageId = " + messageId
// + "): called");
@@ -559,7 +563,7 @@
if (localTx)
{
- commit(tx);
+ tx.commit();
context.setPayload(null);
}
}
@@ -943,6 +947,8 @@
DatabaseEntry value = new DatabaseEntry();
ByteBinding.byteToEntry((byte) 0, value);
+ recordEnqueue(messageId, queue);
+
try
{
_deliveryDb.put(tx, key, value);
@@ -968,6 +974,7 @@
try
{
tx = _environment.beginTransaction(null, null);
+ _log.info("Creating local transaction:" + tx);
context.setPayload(tx);
return true;
@@ -1026,11 +1033,32 @@
}
+ //Record the delete for processing AFTER the commit has taken place.
+ synchronized (_dequeueTxMap)
+ {
+ Map<Long, List<AMQQueue>> transactionMap = _dequeueTxMap.get(tx);
+ if (transactionMap == null)
+ {
+ transactionMap = new HashMap<Long, List<AMQQueue>>();
+ _dequeueTxMap.put(tx, transactionMap);
+ }
+
+ List<AMQQueue> queueList = transactionMap.get(messageId);
+
+ if (queueList == null)
+ {
+ queueList = new LinkedList<AMQQueue>();
+ transactionMap.put(messageId, queueList);
+ }
+
+ queueList.add(queue);
+ }
+
if (isLocal)
{
commit(tx);
context.setPayload(null);
- }
+ }
}
catch (DatabaseException e)
{
@@ -1048,17 +1076,19 @@
{
throw new AMQException("Error rolling back transaction: " + e1, e1);
}
+ finally
+ {
+ synchronized (_dequeueTxMap)
+ {
+ _dequeueTxMap.remove(tx);
+ }
+ }
}
throw new AMQException("Error accessing database while dequeuing message: " + e, e);
}
}
- private boolean isLocalTransaction(StoreContext context)
- {
- return context.getPayload() == null;
- }
-
/**
* Begins a transactional context.
*
@@ -1260,8 +1290,9 @@
catch (Throwable ioobe)
{
abortTran(context);
+
throw new AMQException("Invalid database format. Please use upgrade tool for store in Virtualhost:'"
- + _virtualHost.getName() + "'", ioobe);
+ + _virtualHost.getName() + "'", ioobe.getCause() != null ? ioobe.getCause() : ioobe);
}
stateTransition(State.RECOVERING, State.STARTED);
@@ -1599,31 +1630,11 @@
}
}
- private static final class ProcessAction
- {
- private final AMQQueue _queue;
- private final StoreContext _context;
- private final AMQMessage _message;
- public ProcessAction(AMQQueue queue, StoreContext context, AMQMessage message)
- {
- _queue = queue;
- _context = context;
- _message = message;
- }
-
- public void process() throws AMQException
- {
- _queue.enqueue(_context, _message);
- }
-
- }
-
private void deliverMessages(final StoreContext context, Map<AMQShortString, AMQQueue> queues)
throws DatabaseException, AMQException
{
Map<Long, PersistentAMQMessage> msgMap = new HashMap<Long, PersistentAMQMessage>();
- List<ProcessAction> actions = new ArrayList<ProcessAction>();
Map<AMQShortString, Integer> queueRecoveries = new TreeMap<AMQShortString, Integer>();
@@ -1660,13 +1671,8 @@
maxId = Math.max(maxId, messageId);
PersistentAMQMessage message = msgMap.get(messageId);
- if (message != null)
+ if (message == null)
{
- message.incrementReference();
- }
- else
- {
-
message = (PersistentAMQMessage) _messageFactory.createMessage(messageId, this);
msgMap.put(messageId, message);
@@ -1682,7 +1688,6 @@
ContentChunk cc = getContentBodyChunk(context, messageId, index);
message.recoverContentBodyFrame(cc, index == count);
}
-
}
if (_log.isDebugEnabled())
@@ -1702,15 +1707,11 @@
}
- actions.add(new ProcessAction(queue, context, message));
+ recordEnqueue(messageId, queue);
+ queue.enqueue(context, message);
}
- for (ProcessAction action : actions)
- {
- action.process();
- }
-
_messageId.set(maxId + 1);
}
catch (DatabaseException e)
@@ -1753,9 +1754,21 @@
{
// _log.debug("void commit(Transaction tx = " + tx + "): called");
+ if (tx == null)
+ {
+ throw new DatabaseException("Fatal internal error: transactional context is empty at commitTran");
+ }
+
tx.commitNoSync();
- Commit commit = new Commit(_commitThread, tx);
+ Map<Long, List<AMQQueue>> dequeueMap = null;
+ synchronized (_dequeueTxMap)
+ {
+ dequeueMap = _dequeueTxMap.remove(tx);
+ }
+
+ Commit commit = new Commit(_commitThread, tx, dequeueMap, this);
+
commit.commit();
}
@@ -1773,14 +1786,19 @@
private final Transaction _tx;
private DatabaseException _databaseException;
private boolean _complete;
+ private Map<Long, List<AMQQueue>> _messageDequeueMap;
+ private TransactionLog _transactionLog;
- public Commit(CommitThread commitThread, Transaction tx)
+ public Commit(CommitThread commitThread, Transaction tx, Map<Long,
+ List<AMQQueue>> messageDequeueMap, TransactionLog transactionLog)
{
// _log.debug("public Commit(CommitThread commitThread = " + commitThread + ", Transaction tx = " + tx
// + "): called");
_commitThread = commitThread;
_tx = tx;
+ _messageDequeueMap = messageDequeueMap;
+ _transactionLog = transactionLog;
}
public void prepare(boolean synch) throws DatabaseException
@@ -1813,7 +1831,24 @@
_complete = true;
+ // If we have dequeuedMessages so update our internal state
+ if (_messageDequeueMap != null)
+ {
+ _log.info("Transaction(" + _tx + ") Complete : Dequeuing messages used.");
+ StoreContext dequeueMessageContext = new StoreContext();
+
+ for (Map.Entry<Long, List<AMQQueue>> entry : _messageDequeueMap.entrySet())
+ {
+ Long id = entry.getKey();
+ for (AMQQueue queue : entry.getValue())
+ {
+ ((BDBMessageStore) _transactionLog).recordDequeue(dequeueMessageContext, id, queue);
+ }
+ }
+ }
+
notify();
+
}
public synchronized void abort(DatabaseException databaseException)
@@ -1858,6 +1893,72 @@
}
/**
+ * Record that the give message is enqueued on the specified queue.
+ *
+ * @param messageId The message id to enqueue
+ * @param queue The queue it is enqueued upon.
+ */
+ private void recordEnqueue(Long messageId, AMQQueue queue)
+ {
+ List<AMQQueue> queues = _messageOnQueueMap.get(messageId);
+
+ if (queues == null)
+ {
+ queues = new LinkedList<AMQQueue>();
+ }
+
+ queues.add(queue);
+
+ _messageOnQueueMap.put(messageId, queues);
+ }
+
+ /**
+ * Update our records that the given message is nolonger on the specified queue.
+ * If the message no longer has any queue references then we can discard the content.
+ *
+ * @param context
+ * @param messageId
+ * @param queue
+ */
+ private void recordDequeue(StoreContext context, Long messageId, AMQQueue queue)
+ {
+ _log.info("Dequeue Message(" + messageId + ") from queue(" + queue.getName() + ") context=" + context);
+ List<AMQQueue> queues = _messageOnQueueMap.get(messageId);
+
+ if (queues == null)
+ {
+ throw new RuntimeException("Error, Tried to dequeue a message that is not enqueued");
+ }
+
+ if (queues.remove(queue))
+ {
+ // if we now have no more references to this message we can dispose of it
+ if (queues.size() == 0)
+ {
+ try
+ {
+ _messageOnQueueMap.remove(messageId);
+ _log.info("Removing Message(" + messageId + ") from Tlog context=" + context);
+
+ removeMessage(context, messageId);
+ }
+ catch (AMQException e)
+ {
+ //todo As we are jus swallowing exception need to add clean up in recover().
+ // This should purge any message content that doesn't have any delivery records.
+ _log.debug("Error occured removing unreferenced message:" + e.getMessage());
+ }
+
+ }
+ }
+ else
+ {
+ throw new RuntimeException("Error, Tried to dequeue a message from a queue, upon which it is not enqueued");
+ }
+
+ }
+
+ /**
* Implements a thread which batches and commits a queue of {@link Commit} operations. The commit operations
* themselves are responsible for adding themselves to the queue and waiting for the commit to happen before
* continuing, but it is the responsibility of this thread to tell the commit operations when they have been
Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java 2009-02-19 19:02:07 UTC (rev 3124)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java 2009-02-20 13:34:27 UTC (rev 3125)
@@ -20,33 +20,52 @@
import com.sleepycat.je.DatabaseException;
import junit.framework.Assert;
import junit.framework.TestSuite;
+import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
-import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl;
+import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.DefaultExchangeFactory;
import org.apache.qpid.server.exchange.DirectExchange;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.store.StoreContext;
-import org.apache.qpid.server.store.berkeleydb.utils.BDBVMTestCase;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.flow.LimitlessCreditManager;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQPriorityQueue;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.MessageMetaData;
-import org.apache.qpid.server.queue.AMQQueueFactory;
-import org.apache.qpid.server.queue.AMQPriorityQueue;
import org.apache.qpid.server.queue.MockContentChunk;
+import org.apache.qpid.server.queue.MockPersistentAMQMessage;
+import org.apache.qpid.server.queue.MockProtocolSession;
+import org.apache.qpid.server.queue.MockQueueEntry;
+import org.apache.qpid.server.queue.SimpleAMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.berkeleydb.utils.BDBVMTestCase;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.util.NullApplicationRegistry;
-import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import java.io.File;
+import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
@@ -54,7 +73,7 @@
{
private static final Logger _log = Logger.getLogger(BDBStoreTest.class);
- private BDBMessageStore _store;
+ private BDBMessageStore _transactionLog;
private String STORE_LOCATION = System.getProperty("BDB_WORK") + "/bdbTestEnv";
private StoreContext _storeContext = new StoreContext();
@@ -85,13 +104,13 @@
deleteDirectory(bdbDir);
BDB_DIR.mkdirs();
- _store = new BDBMessageStore();
- _store.configure(BDB_DIR);
+ _transactionLog = new InspectableBDBMessageStore();
+ _transactionLog.configure(BDB_DIR);
- _virtualHost = new VirtualHost("test", _store);
- _store.setVirtualHost(_virtualHost);
+ _virtualHost = new VirtualHost(new VirtualHostConfiguration("test", new PropertiesConfiguration()), _transactionLog);
+ _transactionLog.setVirtualHost(_virtualHost);
- _txnContext = new NonTransactionalContext(_store, _storeContext, null, new LinkedList<RequiredDeliveryException>());
+ _txnContext = new NonTransactionalContext(_transactionLog, _storeContext, null, new LinkedList<RequiredDeliveryException>());
}
private void reload() throws Exception
@@ -101,10 +120,10 @@
PropertiesConfiguration env = new PropertiesConfiguration();
env.addProperty("store.environment-path", STORE_LOCATION);
- env.addProperty("store.class", "org.apache.qpid.server.store.berkeleydb.BDBMessageStore");
+ env.addProperty("store.class", "org.apache.qpid.server.store.berkeleydb.InspectableBDBMessageStore");
- _virtualHost = new VirtualHost("test", env);
- _store = (BDBMessageStore) _virtualHost.getTransactionLog();
+ _virtualHost = new VirtualHost(new VirtualHostConfiguration("test", env));
+ _transactionLog = (BDBMessageStore) _virtualHost.getTransactionLog();
}
public void tearDown() throws Exception
@@ -151,13 +170,13 @@
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, queueArguments);
- _store.createQueue(queue, queueArguments);
+ _transactionLog.createQueue(queue, queueArguments);
AMQShortString routingKey = new AMQShortString("Test-Key");
FieldTable bindArguments = new FieldTable();
bindArguments.put(AMQPFilterTypes.JMS_SELECTOR.getValue(), "Test = 'MST'");
- _store.bindQueue(_virtualHost.getExchangeRegistry().getDefaultExchange(), routingKey, queue, bindArguments);
+ _transactionLog.bindQueue(_virtualHost.getExchangeRegistry().getDefaultExchange(), routingKey, queue, bindArguments);
reload();
@@ -177,7 +196,7 @@
private MessagePublishInfo createPublishBody()
{
- return new MessagePublishInfoImpl(MYEXCHANGE,false,true,RK);
+ return new MessagePublishInfoImpl(MYEXCHANGE, false, true, RK);
}
private BasicContentHeaderProperties createContentHeaderProperties()
@@ -230,10 +249,10 @@
ContentHeaderBody chb = createContentHeaderBody(props, body.getSize());
- _store.storeMessageMetaData(_storeContext, 14L, new MessageMetaData(pubBody, chb, 1));
- _store.storeContentBodyChunk(_storeContext, 14L, 0, body, true);
+ _transactionLog.storeMessageMetaData(_storeContext, 14L, new MessageMetaData(pubBody, chb, 1));
+ _transactionLog.storeContentBodyChunk(_storeContext, 14L, 0, body, true);
- MessageMetaData mmd = _store.getMessageMetaData(_storeContext, 14L);
+ MessageMetaData mmd = _transactionLog.getMessageMetaData(_storeContext, 14L);
MessagePublishInfo returnedPubBody = mmd.getMessagePublishInfo();
Assert.assertEquals("Message exchange has changed", pubBody.getExchange(), returnedPubBody.getExchange());
Assert.assertEquals("Immediate flag has changed", pubBody.isImmediate(), returnedPubBody.isImmediate());
@@ -248,7 +267,7 @@
Assert.assertEquals("Property ContentType has changed", props.getContentTypeAsString(), returnedProperties.getContentTypeAsString());
Assert.assertEquals("Property MessageID has changed", props.getMessageIdAsString(), returnedProperties.getMessageIdAsString());
Assert.assertEquals("MessageMD ChunkCount has changed", mmd.getContentChunkCount(), 1);
- ContentChunk returnedContentBody = _store.getContentBodyChunk(_storeContext, 14L, 0);
+ ContentChunk returnedContentBody = _transactionLog.getContentBodyChunk(_storeContext, 14L, 0);
ByteBuffer returnedPayloadAsBytes = returnedContentBody.getData();
byte[] returnedPayload = new byte[returnedPayloadAsBytes.remaining()];
returnedPayloadAsBytes.get(returnedPayload);
@@ -264,15 +283,15 @@
ContentChunk body = createContentChunk(bodyText);
ContentHeaderBody chb = createContentHeaderBody(props, body.getSize());
- _store.storeMessageMetaData(_storeContext, 15L, new MessageMetaData(pubBody, chb, 1));
- _store.storeContentBodyChunk(_storeContext, 15L, 0, body, true);
- _store.getContentBodyChunk(_storeContext, 15L, 0);
- _store.removeMessage(_storeContext, 15L);
+ _transactionLog.storeMessageMetaData(_storeContext, 15L, new MessageMetaData(pubBody, chb, 1));
+ _transactionLog.storeContentBodyChunk(_storeContext, 15L, 0, body, true);
+ _transactionLog.getContentBodyChunk(_storeContext, 15L, 0);
+ _transactionLog.removeMessage(_storeContext, 15L);
// the next line should throw since the message id should not be found
try
{
- _store.getMessageMetaData(_storeContext, 15L);
+ _transactionLog.getMessageMetaData(_storeContext, 15L);
Assert.fail("No exception thrown when message id not found getting metadata");
}
catch (AMQException e)
@@ -282,7 +301,7 @@
try
{
- _store.getContentBodyChunk(_storeContext, 15L, 0);
+ _transactionLog.getContentBodyChunk(_storeContext, 15L, 0);
Assert.fail("No exception thrown when message id not found getting content chunk");
}
catch (AMQException e)
@@ -300,19 +319,19 @@
ContentChunk body = createContentChunk(bodyText);
ContentHeaderBody chb = createContentHeaderBody(props, body.getSize());
- _store.storeMessageMetaData(_storeContext, 20L, new MessageMetaData(pubBody, chb, 0));
- _store.storeMessageMetaData(_storeContext, 21L, new MessageMetaData(pubBody, chb, 0));
- _store.storeMessageMetaData(_storeContext, 22L, new MessageMetaData(pubBody, chb, 0));
+ _transactionLog.storeMessageMetaData(_storeContext, 20L, new MessageMetaData(pubBody, chb, 0));
+ _transactionLog.storeMessageMetaData(_storeContext, 21L, new MessageMetaData(pubBody, chb, 0));
+ _transactionLog.storeMessageMetaData(_storeContext, 22L, new MessageMetaData(pubBody, chb, 0));
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
- _store.createQueue(queue);
+ _transactionLog.createQueue(queue);
- _store.beginTran(_storeContext);
- _store.enqueueMessage(_storeContext, queue, 20L);
- _store.enqueueMessage(_storeContext, queue, 21L);
- _store.commitTran(_storeContext);
+ _transactionLog.beginTran(_storeContext);
+ _transactionLog.enqueueMessage(_storeContext, queue, 20L);
+ _transactionLog.enqueueMessage(_storeContext, queue, 21L);
+ _transactionLog.commitTran(_storeContext);
- List<Long> enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
+ List<Long> enqueuedIds = _transactionLog.getEnqueuedMessages(QUEUE1);
Assert.assertEquals("Enqueued messages have changed", 2, enqueuedIds.size());
Long val = enqueuedIds.get(0);
Assert.assertEquals("First Message is incorrect", 20L, val.longValue());
@@ -323,7 +342,7 @@
public void testTranRollback1() throws Exception
{
- List<Long> enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
+ List<Long> enqueuedIds = _transactionLog.getEnqueuedMessages(QUEUE1);
assertTrue("Last Test Messages are still present", enqueuedIds.isEmpty());
MessagePublishInfo pubBody = createPublishBody();
@@ -333,26 +352,26 @@
ContentHeaderBody chb = createContentHeaderBody(props, body.getSize());
- _store.storeMessageMetaData(_storeContext, 30L, new MessageMetaData(pubBody, chb, 0));
- _store.storeMessageMetaData(_storeContext, 31L, new MessageMetaData(pubBody, chb, 0));
- _store.storeMessageMetaData(_storeContext, 32L, new MessageMetaData(pubBody, chb, 0));
+ _transactionLog.storeMessageMetaData(_storeContext, 30L, new MessageMetaData(pubBody, chb, 0));
+ _transactionLog.storeMessageMetaData(_storeContext, 31L, new MessageMetaData(pubBody, chb, 0));
+ _transactionLog.storeMessageMetaData(_storeContext, 32L, new MessageMetaData(pubBody, chb, 0));
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
- _store.createQueue(queue);
+ _transactionLog.createQueue(queue);
- _store.beginTran(_storeContext);
- _store.enqueueMessage(_storeContext, queue, 30L);
- _store.enqueueMessage(_storeContext, queue, 31L);
- _store.commitTran(_storeContext);
+ _transactionLog.beginTran(_storeContext);
+ _transactionLog.enqueueMessage(_storeContext, queue, 30L);
+ _transactionLog.enqueueMessage(_storeContext, queue, 31L);
+ _transactionLog.commitTran(_storeContext);
- _store.beginTran(_storeContext);
- _store.enqueueMessage(_storeContext, queue, 32L);
- _store.abortTran(_storeContext);
+ _transactionLog.beginTran(_storeContext);
+ _transactionLog.enqueueMessage(_storeContext, queue, 32L);
+ _transactionLog.abortTran(_storeContext);
- _store.beginTran(_storeContext);
- _store.commitTran(_storeContext);
+ _transactionLog.beginTran(_storeContext);
+ _transactionLog.commitTran(_storeContext);
- enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
+ enqueuedIds = _transactionLog.getEnqueuedMessages(QUEUE1);
assertTrue("Last Test Message is still present", !enqueuedIds.contains(20L));
assertEquals("Incorrect Enqueued Message Count:", 2, enqueuedIds.size());
Long val = enqueuedIds.get(0);
@@ -364,7 +383,7 @@
public void testTranRollback2() throws Exception
{
- List<Long> enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
+ List<Long> enqueuedIds = _transactionLog.getEnqueuedMessages(QUEUE1);
assertTrue("Last Test Messages are still present", enqueuedIds.isEmpty());
MessagePublishInfo pubBody = createPublishBody();
@@ -374,23 +393,23 @@
ContentHeaderBody chb = createContentHeaderBody(props, body.getSize());
- _store.storeMessageMetaData(_storeContext, 30L, new MessageMetaData(pubBody, chb, 0));
- _store.storeMessageMetaData(_storeContext, 31L, new MessageMetaData(pubBody, chb, 0));
- _store.storeMessageMetaData(_storeContext, 32L, new MessageMetaData(pubBody, chb, 0));
+ _transactionLog.storeMessageMetaData(_storeContext, 30L, new MessageMetaData(pubBody, chb, 0));
+ _transactionLog.storeMessageMetaData(_storeContext, 31L, new MessageMetaData(pubBody, chb, 0));
+ _transactionLog.storeMessageMetaData(_storeContext, 32L, new MessageMetaData(pubBody, chb, 0));
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
- _store.createQueue(queue);
+ _transactionLog.createQueue(queue);
- _store.beginTran(_storeContext);
- _store.enqueueMessage(_storeContext, queue, 30L);
- _store.abortTran(_storeContext);
+ _transactionLog.beginTran(_storeContext);
+ _transactionLog.enqueueMessage(_storeContext, queue, 30L);
+ _transactionLog.abortTran(_storeContext);
- _store.beginTran(_storeContext);
- _store.enqueueMessage(_storeContext, queue, 31L);
- _store.enqueueMessage(_storeContext, queue, 32L);
- _store.commitTran(_storeContext);
+ _transactionLog.beginTran(_storeContext);
+ _transactionLog.enqueueMessage(_storeContext, queue, 31L);
+ _transactionLog.enqueueMessage(_storeContext, queue, 32L);
+ _transactionLog.commitTran(_storeContext);
- enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
+ enqueuedIds = _transactionLog.getEnqueuedMessages(QUEUE1);
Assert.assertEquals("Incorrect Enqueued Message Count", 2, enqueuedIds.size());
Long val = enqueuedIds.get(0);
Assert.assertEquals("First Message is incorrect", 31L, val.longValue());
@@ -400,7 +419,7 @@
public void testRecovery() throws Exception
{
- List<Long> enqueuedIds = _store.getEnqueuedMessages(QUEUE1);
+ List<Long> enqueuedIds = _transactionLog.getEnqueuedMessages(QUEUE1);
assertTrue("Last Test Messages are still present", enqueuedIds.isEmpty());
MessagePublishInfo pubBody = createPublishBody();
@@ -410,25 +429,25 @@
ContentHeaderBody chb = createContentHeaderBody(props, body.getSize());
- _store.storeMessageMetaData(_storeContext, 40L, new MessageMetaData(pubBody, chb, 0));
- _store.storeMessageMetaData(_storeContext, 41L, new MessageMetaData(pubBody, chb, 0));
- _store.storeMessageMetaData(_storeContext, 42L, new MessageMetaData(pubBody, chb, 1));
+ _transactionLog.storeMessageMetaData(_storeContext, 40L, new MessageMetaData(pubBody, chb, 0));
+ _transactionLog.storeMessageMetaData(_storeContext, 41L, new MessageMetaData(pubBody, chb, 0));
+ _transactionLog.storeMessageMetaData(_storeContext, 42L, new MessageMetaData(pubBody, chb, 1));
- _store.storeContentBodyChunk(_storeContext, 42L, 0, new MockContentChunk(), true);
+ _transactionLog.storeContentBodyChunk(_storeContext, 42L, 0, new MockContentChunk(), true);
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
AMQQueue queue2 = AMQQueueFactory.createAMQQueueImpl(QUEUE2, true, HIM, false, _virtualHost, null);
- _store.createQueue(queue);
- _store.createQueue(queue2);
+ _transactionLog.createQueue(queue);
+ _transactionLog.createQueue(queue2);
- _store.beginTran(_storeContext);
- _store.enqueueMessage(_storeContext, queue, 40L);
- _store.enqueueMessage(_storeContext, queue, 41L);
- _store.enqueueMessage(_storeContext, queue2, 42L);
- _store.commitTran(_storeContext);
+ _transactionLog.beginTran(_storeContext);
+ _transactionLog.enqueueMessage(_storeContext, queue, 40L);
+ _transactionLog.enqueueMessage(_storeContext, queue, 41L);
+ _transactionLog.enqueueMessage(_storeContext, queue2, 42L);
+ _transactionLog.commitTran(_storeContext);
- _store.enqueueMessage(_storeContext, queue, 42L);
+ _transactionLog.enqueueMessage(_storeContext, queue, 42L);
reload();
@@ -459,23 +478,23 @@
ContentHeaderBody chb = createContentHeaderBody(props, body.getSize());
- _store.storeMessageMetaData(_storeContext, 50L, new MessageMetaData(pubBody, chb, 0));
+ _transactionLog.storeMessageMetaData(_storeContext, 50L, new MessageMetaData(pubBody, chb, 0));
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
- _store.createQueue(queue);
+ _transactionLog.createQueue(queue);
- _store.enqueueMessage(_storeContext, queue, 50L);
- _store.dequeueMessage(_storeContext, queue, 50L);
+ _transactionLog.enqueueMessage(_storeContext, queue, 50L);
+ _transactionLog.dequeueMessage(_storeContext, queue, 50L);
}
public void testQueueRemove() throws AMQException
{
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(QUEUE1, true, ME, false, _virtualHost, null);
- _store.createQueue(queue);
- _store.removeQueue(queue);
+ _transactionLog.createQueue(queue);
+ _transactionLog.removeQueue(queue);
try
{
- _store.removeQueue(queue);
+ _transactionLog.removeQueue(queue);
Assert.fail("No exception thrown when deleting non-existant queue");
}
catch (AMQException e)
Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java 2009-02-19 19:02:07 UTC (rev 3124)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBUpgradeTest.java 2009-02-20 13:34:27 UTC (rev 3125)
@@ -24,9 +24,13 @@
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.berkeleydb.utils.Publisher;
import org.apache.qpid.server.store.berkeleydb.utils.DurableSubscriber;
+import org.apache.qpid.server.queue.MessageFactory;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.client.transport.TransportConnection;
import org.apache.qpid.util.FileUtils;
import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.PropertiesConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,9 +46,9 @@
{
protected static final Logger _logger = LoggerFactory.getLogger(BDBUpgradeTest.class);
- final String BDBHome = System.getProperty("BDB_HOME");
- final File _configFile = new File(BDBHome, "etc/config.xml");
-
+ protected final String QpidHome = System.getProperty("QPID_HOME");
+ protected final File _configFile = new File(QpidHome, "etc/persistent_config.xml");
+
private String VIRTUALHOST = "test";
private static final String VERSION_1 = "1";
@@ -113,57 +117,69 @@
}
}
+ public void testDurabability() throws Exception
+ {
+ String broker = "vm://:1";
+
+ startBroker(1, VERSION_1);
+
+ sendAndCheckDurableSubscriber(broker, true, false, 10, null);
+
+ stopBroker(1);
+
+ startBroker(1, VERSION_1);
+
+ sendAndCheckDurableSubscriber(broker, false, true, 10, null);
+
+ stopBroker(1);
+ }
+
+ /**
+ * This test is currently broken due to QPID-1275
+ * @throws Exception
+ */
public void testDurababilitySelectors() throws Exception
{
String broker = "vm://:1";
+ System.err.println("Start V1 Broker");
startBroker(1, VERSION_1);
new DurableSubscriber(broker, _topic, null).close();
+ System.err.println("Send 10 message to topic:"+_topic);
sendMessages(broker, _topic, 10);
+ //don't actually send any messages just check we get 5 odd messages
sendAndCheckDurableSubscriber(broker, false, true, 5, "odd=true");
Thread.sleep(2000);
stopBroker(1);
+ System.err.println("Upgrade Broker");
upgradeBroker();
- broker = "vm://:2";
+ System.err.println("Start V2 Broker");
+ startBroker(1, VERSION_2);
- startBroker(2, VERSION_2);
-
+ System.err.println("Ensure we can receive all the messagse messages but don't ack them");
//Ensure msg were transitioned to new broker
+ //don't actually send any messages just check we get 5 odd messages
sendAndCheckDurableSubscriber(broker, false, false, 5, null);
- //Reset the Selector Pattern
+ System.err.println("Change selector so on recovery the messages are dropped");
+ //Reset the Selector Pattern this should recreate the queue
new DurableSubscriber(broker, _topic, "odd=true").close();
- stopBroker(2);
-
- startBroker(2, VERSION_2);
-
- //Ensure that the selector was preseved on restart and caused all msgs to be removed.
- sendAndCheckDurableSubscriber(broker, false, false, 0, null);
- stopBroker(2);
- }
-
- public void testDurabability() throws Exception
- {
- String broker = "vm://:1";
-
- startBroker(1, VERSION_1);
-
- sendAndCheckDurableSubscriber(broker, true, false, 10, null);
-
stopBroker(1);
- startBroker(1, VERSION_1);
+ System.err.println("Start V2 Broker");
- sendAndCheckDurableSubscriber(broker, false, true, 10, null);
+ startBroker(1, VERSION_2);
+ //Ensure that the selector was preseved on restart and caused all msgs to be removed.
+ sendAndCheckDurableSubscriber(broker, false, true, 0, null);
stopBroker(1);
}
@@ -184,16 +200,24 @@
ConfigurationFileApplicationRegistry config = new ConfigurationFileApplicationRegistry(_configFile);
//Disable management on broker.
- config.getConfiguration().setProperty("management.enabled", "false");
+ config.getConfiguration().setManagementEnabled(false);
+
+ //There can be only one MF so we need to reset it here or recovery will not work
+ MessageFactory.getInstance().reset();
+ //todo need to change MF so it is per AppRegistry not a singleton
- Configuration testVirtualhost = config.getConfiguration().subset("virtualhosts.virtualhost." + VIRTUALHOST);
+ ApplicationRegistry.initialise(config, port);
+
+ Configuration testVirtualhost = new PropertiesConfiguration();
testVirtualhost.setProperty("store.class", "org.apache.qpid.server.store.berkeleydb.BDBMessageStore");
- testVirtualhost.setProperty("store." + BDBMessageStore.ENVIRONMENT_PATH_PROPERTY, "${work}/version" + port + "Store");
+ testVirtualhost.setProperty("store." + BDBMessageStore.ENVIRONMENT_PATH_PROPERTY,
+ System.getProperty("QPID_WORK")+ "/version" + version + "Store");
testVirtualhost.setProperty("store.version", version);
- ApplicationRegistry.initialise(config, port);
+ ApplicationRegistry.getInstance(port).getVirtualHostRegistry().
+ registerVirtualHost(new VirtualHost(new VirtualHostConfiguration("bdbtest",testVirtualhost)));
+
TransportConnection.createVMBroker(port);
-
}
public void sendAndCheckDurableSubscriber(String broker, boolean send, boolean commitRecieved, int count, String selector)
Added: store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/InspectableBDBMessageStore.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/InspectableBDBMessageStore.java (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/InspectableBDBMessageStore.java 2009-02-20 13:34:27 UTC (rev 3125)
@@ -0,0 +1,34 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.TestTransactionLog;
+
+import java.util.List;
+
+public class InspectableBDBMessageStore extends BDBMessageStore implements TestTransactionLog
+{
+ public List<AMQQueue> getMessageReferenceMap(Long messageId)
+ {
+ return _messageOnQueueMap.get(messageId);
+ }
+}
Added: store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessagePersistenceTest.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessagePersistenceTest.java (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessagePersistenceTest.java 2009-02-20 13:34:27 UTC (rev 3125)
@@ -0,0 +1,327 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.framing.amqp_8_0.BasicConsumeBodyImpl;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.flow.LimitlessCreditManager;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.AMQQueueFactory;
+import org.apache.qpid.server.queue.IncomingMessage;
+import org.apache.qpid.server.queue.MockProtocolSession;
+import org.apache.qpid.server.queue.SimpleAMQQueue;
+import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.store.berkeleydb.utils.BDBVMTestCase;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
+import org.apache.qpid.server.txn.NonTransactionalContext;
+import org.apache.qpid.server.util.NullApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.commons.configuration.PropertiesConfiguration;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+public class MessagePersistenceTest extends BDBVMTestCase
+{
+
+ private static final Logger _log = Logger.getLogger(MessagePersistenceTest.class);
+
+ protected InspectableBDBMessageStore _transactionLog;
+ private String STORE_LOCATION = System.getProperty("BDB_WORK") + "/bdbTestEnv";
+ protected VirtualHost _virtualHost;
+
+ protected SimpleAMQQueue _queue1, _queue2;
+ protected AMQShortString _q1name = new AMQShortString("q1name");
+ protected AMQShortString _q2name = new AMQShortString("q2name");
+ protected AMQShortString _owner = new AMQShortString("owner");
+ protected AMQShortString _routingKey = new AMQShortString("routing key");
+ protected NonTransactionalContext _messageDeliveryContext;
+
+ protected static final long MESSAGE_SIZE = 0L;
+
+ File BDB_DIR = new File(STORE_LOCATION);
+
+ protected boolean _transactional;
+ protected boolean _ack;
+
+ public void setUp() throws Exception
+ {
+ if (BDB_DIR.exists())
+ {
+ deleteDirectory(BDB_DIR);
+ }
+
+ ApplicationRegistry.initialise(new NullApplicationRegistry());
+ ApplicationRegistry applicationRegistry = (ApplicationRegistry) ApplicationRegistry.getInstance(1);
+
+ File bdbDir = new File(STORE_LOCATION);
+ deleteDirectory(bdbDir);
+ BDB_DIR.mkdirs();
+
+ _transactionLog = new InspectableBDBMessageStore();
+ _transactionLog.configure(BDB_DIR);
+
+ _virtualHost = new VirtualHost(new VirtualHostConfiguration("bdbtest",new PropertiesConfiguration()), _transactionLog);
+ _transactionLog.setVirtualHost(_virtualHost);
+
+ applicationRegistry.getVirtualHostRegistry().registerVirtualHost(_virtualHost);
+
+ _queue1 = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_q1name, false, _owner, false, _virtualHost, null);
+ _queue2 = (SimpleAMQQueue) AMQQueueFactory.createAMQQueueImpl(_q2name, false, _owner, false, _virtualHost, null);
+ // Create IncomingMessage and nondurable queue
+ _messageDeliveryContext = new NonTransactionalContext(_transactionLog, new StoreContext(), null, null);
+
+ _transactional = false;
+ _ack = true;
+ }
+
+ protected IncomingMessage createMessage(MessagePublishInfo info) throws AMQException
+ {
+ IncomingMessage msg = new IncomingMessage(info, _messageDeliveryContext,
+ new MockProtocolSession(_transactionLog), _transactionLog);
+
+ // equivalent to amqChannel.publishContenHeader
+ ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
+ contentHeaderBody.classId = BasicConsumeBodyImpl.CLASS_ID;
+ // This message has no bodies
+ contentHeaderBody.bodySize = MESSAGE_SIZE;
+ contentHeaderBody.properties = new BasicContentHeaderProperties();
+ ((BasicContentHeaderProperties) contentHeaderBody.properties).setDeliveryMode((byte) 2);
+
+ msg.setContentHeaderBody(contentHeaderBody);
+ msg.setExpiration();
+
+ return msg;
+ }
+
+
+ /**
+ * Tests delivery of a single persistent message to a queue. Validate through reference counting that the
+ * Message is removed from the TransactionLog when reference count has been used have been removed.
+ *
+ * @throws Exception
+ */
+ public void testSingleConsume() throws Exception
+ {
+ MessagePublishInfo info = new MessagePublishInfoImpl();
+ IncomingMessage msg = createMessage(info);
+
+ // Send persistent message
+ ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
+ qs.add(_queue1);
+
+ // equivalent to amqChannel.routeMessage()
+ msg.enqueue(qs);
+
+ msg.routingComplete(_transactionLog);
+
+ // equivalent to amqChannel.deliverCurrentMessageIfComplete
+ msg.deliverToQueues();
+
+ // Check that data has been stored to disk
+ long messageId = msg.getMessageId();
+ checkMessageMetaDataExists(messageId);
+
+ // Check that it is enqueued
+ List<AMQQueue> queueList = _transactionLog.getMessageReferenceMap(messageId);
+ assertNotNull(queueList);
+ assertEquals("Message should be enqueued on both queues.", 1, queueList.size());
+ assertTrue("Queue1 not contained in list.", queueList.contains(_queue1));
+
+ // Create consumer to correctly consume message
+ AMQProtocolSession session1 = new MockProtocolSession(_transactionLog);
+ AMQChannel channel1 = new AMQChannel(session1, 1, _transactionLog);
+ session1.addChannel(channel1);
+
+ if (_transactional && _ack)
+ {
+ channel1.setLocalTransactional();
+ }
+
+ Subscription sub1 = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, session1,
+ new AMQShortString("cTag1"), _ack,
+ null, false,
+ new LimitlessCreditManager());
+
+ _queue1.registerSubscription(sub1, true);
+
+ // Give the delivery thread time to deliver the message
+ Thread.sleep(200);
+ Thread.yield();
+
+ if (_ack)
+ {
+ // Acknowledge the message
+ channel1.acknowledgeMessage(1, false);
+
+ if (_transactional)
+ {
+ channel1.commit();
+ }
+ }
+ // Check that it is now dequeued
+ queueList = _transactionLog.getMessageReferenceMap(messageId);
+ assertNull("Queue List was not empty:" + (queueList != null ? queueList.size() : ""), queueList);
+
+ checkMessageMetaDataRemoved(messageId);
+ }
+
+ /**
+ * Tests delivery of a single persistent message to two queues so validate through reference counting that the
+ * Message is removed from the TransactionLog when all references have been removed.
+ *
+ * @throws Exception
+ */
+ public void testTwoQueues() throws Exception
+ {
+ MessagePublishInfo info = new MessagePublishInfoImpl();
+ IncomingMessage msg = createMessage(info);
+
+ // Send persistent message
+ ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
+ qs.add(_queue1);
+ qs.add(_queue2);
+
+ // equivalent to amqChannel.routeMessage()
+ msg.enqueue(qs);
+
+ msg.routingComplete(_transactionLog);
+
+ // equivalent to amqChannel.deliverCurrentMessageIfComplete
+ msg.deliverToQueues();
+
+ // Check that data has been stored to disk
+ long messageId = msg.getMessageId();
+ checkMessageMetaDataExists(messageId);
+
+ // Check that it is enqueued
+ List<AMQQueue> queueList = _transactionLog.getMessageReferenceMap(messageId);
+ assertNotNull(queueList);
+ assertEquals("Message should be enqueued on both queues.", 2, queueList.size());
+ assertTrue("Queue1 not contained in list.", queueList.contains(_queue1));
+ assertTrue("Queue2 not contained in list.", queueList.contains(_queue2));
+
+ // Create consumer to correctly consume message
+ AMQProtocolSession session1 = new MockProtocolSession(_transactionLog);
+ AMQChannel channel1 = new AMQChannel(session1, 1, _transactionLog);
+ session1.addChannel(channel1);
+
+ if (_transactional & _ack)
+ {
+ channel1.setLocalTransactional();
+ }
+
+ Subscription sub1 = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, session1,
+ new AMQShortString("cTag1"), _ack,
+ null, false,
+ new LimitlessCreditManager());
+ _queue1.registerSubscription(sub1, true);
+
+ // Give the delivery thread time to deliver the message
+ Thread.sleep(200);
+ Thread.yield();
+
+ if (_ack)
+ {
+ // Acknowledge the message
+ channel1.acknowledgeMessage(1, false);
+
+ if (_transactional)
+ {
+ channel1.commit();
+ }
+ }
+ // Check that data still exists
+ checkMessageMetaDataExists(messageId);
+
+ // Check that message was consumed
+ queueList = _transactionLog.getMessageReferenceMap(messageId);
+ assertNotNull(queueList);
+ assertEquals("Message Should only be enqueued on one queue.", 1, queueList.size());
+ assertTrue("Queue2 not contained in list.", queueList.contains(_queue2));
+
+ Subscription sub2 = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, session1,
+ new AMQShortString("cTag2"), _ack,
+ null, false,
+ new LimitlessCreditManager());
+
+ //Consume from second queue
+ _queue2.registerSubscription(sub2, true);
+
+ // Give the delivery thread time to deliver the message
+ Thread.sleep(200);
+ Thread.yield();
+
+ if (_ack)
+ {
+ // Acknowledge the message
+ channel1.acknowledgeMessage(2, false);
+
+ if (_transactional)
+ {
+ channel1.commit();
+ }
+ }
+ // Check that it is now dequeued
+ queueList = _transactionLog.getMessageReferenceMap(messageId);
+ assertNull("Queue List was not empty:" + (queueList != null ? queueList.size() : ""), queueList);
+
+ checkMessageMetaDataRemoved(messageId);
+ }
+
+ protected void checkMessageMetaDataExists(long messageId)
+ {
+ try
+ {
+ _transactionLog.getMessageMetaData(_messageDeliveryContext.getStoreContext(), messageId);
+ }
+ catch (AMQException amqe)
+ {
+ fail("Message MetaData does not exist for message:" + messageId);
+ }
+ }
+
+ protected void checkMessageMetaDataRemoved(long messageId)
+ {
+ try
+ {
+ _transactionLog.getMessageMetaData(_messageDeliveryContext.getStoreContext(), messageId);
+ fail("Message MetaData still exists for message:" + messageId);
+ }
+ catch (AMQException amqe)
+ {
+ assertEquals("Metadata not found for message with id " + messageId, amqe.getMessage());
+ }
+ }
+
+}
Added: store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessagePersistenceTransactionalTest.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessagePersistenceTransactionalTest.java (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessagePersistenceTransactionalTest.java 2009-02-20 13:34:27 UTC (rev 3125)
@@ -0,0 +1,239 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.flow.LimitlessCreditManager;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.IncomingMessage;
+import org.apache.qpid.server.queue.MockProtocolSession;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class MessagePersistenceTransactionalTest extends MessagePersistenceTest
+{
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ _transactional = true;
+ }
+
+ /**
+ * Tests delivery of a single persistent message to two queues so validate through reference counting that the
+ * Message is removed from the TransactionLog when all references have been removed.
+ *
+ * @throws Exception
+ */
+ public void testAckTwoQueuesInOneTx() throws Exception
+ {
+ MessagePublishInfo info = new MessagePublishInfoImpl();
+ IncomingMessage msg = createMessage(info);
+
+ // Send persistent message
+ ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
+ qs.add(_queue1);
+ qs.add(_queue2);
+
+ // equivalent to amqChannel.routeMessage()
+ msg.enqueue(qs);
+
+ msg.routingComplete(_transactionLog);
+
+ // equivalent to amqChannel.deliverCurrentMessageIfComplete
+ msg.deliverToQueues();
+
+ // Check that data has been stored to disk
+ long messageId = msg.getMessageId();
+ checkMessageMetaDataExists(messageId);
+
+ // Check that it is enqueued
+ List<AMQQueue> queueList = _transactionLog.getMessageReferenceMap(messageId);
+ assertNotNull(queueList);
+ assertEquals("Message should be enqueued on both queues.", 2, queueList.size());
+ assertTrue("Queue1 not contained in list.", queueList.contains(_queue1));
+ assertTrue("Queue2 not contained in list.", queueList.contains(_queue2));
+
+ // Create consumer to correctly consume message
+ AMQProtocolSession session1 = new MockProtocolSession(_transactionLog);
+ AMQChannel channel1 = new AMQChannel(session1, 1, _transactionLog);
+ session1.addChannel(channel1);
+
+ channel1.setLocalTransactional();
+
+ Subscription sub1 = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, session1,
+ new AMQShortString("cTag1"), true,
+ null, false,
+ new LimitlessCreditManager());
+ _queue1.registerSubscription(sub1, true);
+
+ // Give the delivery thread time to deliver the message
+ Thread.sleep(250);
+ Thread.yield();
+
+ // Acknowledge the message
+ channel1.acknowledgeMessage(1, false);
+
+ // Check that data still exists
+ checkMessageMetaDataExists(messageId);
+
+ // Check that it is enqueued
+ queueList = _transactionLog.getMessageReferenceMap(messageId);
+ assertNotNull(queueList);
+ assertEquals("Message should be enqueued on both queues.", 2, queueList.size());
+ assertTrue("Queue1 not contained in list.", queueList.contains(_queue1));
+ assertTrue("Queue2 not contained in list.", queueList.contains(_queue2));
+
+ Subscription sub2 = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, session1,
+ new AMQShortString("cTag2"), true,
+ null, false,
+ new LimitlessCreditManager());
+
+ //Consume from second queue
+ _queue2.registerSubscription(sub2, true);
+
+ // Give the delivery thread time to deliver the message
+ Thread.sleep(250);
+ Thread.yield();
+
+ // Acknowledge the message
+ channel1.acknowledgeMessage(2, false);
+
+ // Check that it is enqueued
+ queueList = _transactionLog.getMessageReferenceMap(messageId);
+ assertNotNull(queueList);
+ assertEquals("Message should be enqueued on both queues.", 2, queueList.size());
+ assertTrue("Queue1 not contained in list.", queueList.contains(_queue1));
+ assertTrue("Queue2 not contained in list.", queueList.contains(_queue2));
+
+ if (_transactional)
+ {
+ channel1.commit();
+ }
+
+ // Check that it is now dequeued
+ queueList = _transactionLog.getMessageReferenceMap(messageId);
+ assertNull("Queue List was not empty:" + (queueList != null ? queueList.size() : ""), queueList);
+
+ checkMessageMetaDataRemoved(messageId);
+ }
+
+ public void testEnqueueOnTwoQueuesAckWithRollback() throws Exception
+ {
+ MessagePublishInfo info = new MessagePublishInfoImpl();
+ IncomingMessage msg = createMessage(info);
+
+ // Send persistent message
+ ArrayList<AMQQueue> qs = new ArrayList<AMQQueue>();
+ qs.add(_queue1);
+ qs.add(_queue2);
+
+ // equivalent to amqChannel.routeMessage()
+ msg.enqueue(qs);
+
+ msg.routingComplete(_transactionLog);
+
+ // equivalent to amqChannel.deliverCurrentMessageIfComplete
+ msg.deliverToQueues();
+
+ // Check that data has been stored to disk
+ long messageId = msg.getMessageId();
+ checkMessageMetaDataExists(messageId);
+
+ // Check that it is enqueued
+ List<AMQQueue> queueList = _transactionLog.getMessageReferenceMap(messageId);
+ assertNotNull(queueList);
+ assertEquals("Message should be enqueued on both queues.", 2, queueList.size());
+ assertTrue("Queue1 not contained in list.", queueList.contains(_queue1));
+ assertTrue("Queue2 not contained in list.", queueList.contains(_queue2));
+
+ // Create consumer to correctly consume message
+ AMQProtocolSession session1 = new MockProtocolSession(_transactionLog);
+ AMQChannel channel1 = new AMQChannel(session1, 1, _transactionLog);
+ session1.addChannel(channel1);
+
+ channel1.setLocalTransactional();
+
+ Subscription sub1 = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, session1,
+ new AMQShortString("cTag1"), true,
+ null, false,
+ new LimitlessCreditManager());
+ _queue1.registerSubscription(sub1, true);
+
+ // Give the delivery thread time to deliver the message
+ Thread.sleep(250);
+ Thread.yield();
+
+ // Acknowledge the message
+ channel1.acknowledgeMessage(1, false);
+
+ // Check that data still exists
+ checkMessageMetaDataExists(messageId);
+
+ // Check that it is enqueued
+ queueList = _transactionLog.getMessageReferenceMap(messageId);
+ assertNotNull(queueList);
+ assertEquals("Message should be enqueued on both queues.", 2, queueList.size());
+ assertTrue("Queue1 not contained in list.", queueList.contains(_queue1));
+ assertTrue("Queue2 not contained in list.", queueList.contains(_queue2));
+
+ AMQProtocolSession session2 = new MockProtocolSession(_transactionLog);
+ AMQChannel channel2 = new AMQChannel(session2, 1, _transactionLog);
+ session2.addChannel(channel2);
+
+ Subscription sub2 = SubscriptionFactoryImpl.INSTANCE.createSubscription(1, session2,
+ new AMQShortString("cTag2"), true,
+ null, false,
+ new LimitlessCreditManager());
+
+ //Consume from second queue
+ _queue2.registerSubscription(sub2, true);
+
+ // Give the delivery thread time to deliver the message
+ Thread.sleep(250);
+ Thread.yield();
+
+ // Acknowledge the message
+ channel2.acknowledgeMessage(1, false);
+
+ // Check that it is still enqueued for queue1
+ queueList = _transactionLog.getMessageReferenceMap(messageId);
+ assertNotNull(queueList);
+ assertEquals("Message should be enqueued on both queues.", 1, queueList.size());
+ assertTrue("Queue1 not contained in list.", queueList.contains(_queue1));
+
+ channel1.rollback();
+
+ queueList = _transactionLog.getMessageReferenceMap(messageId);
+ assertNotNull(queueList);
+ assertEquals("Message should be enqueued on both queues.", 1, queueList.size());
+ assertTrue("Queue1 not contained in list.", queueList.contains(_queue1));
+
+ checkMessageMetaDataExists(messageId);
+ }
+
+}
Added: store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessagePersistentNoAckTest.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessagePersistentNoAckTest.java (rev 0)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessagePersistentNoAckTest.java 2009-02-20 13:34:27 UTC (rev 3125)
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store.berkeleydb;
+
+public class MessagePersistentNoAckTest extends MessagePersistenceTest
+{
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ _ack = false;
+ }
+}
Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageReSendTest.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageReSendTest.java 2009-02-19 19:02:07 UTC (rev 3124)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/MessageReSendTest.java 2009-02-20 13:34:27 UTC (rev 3125)
@@ -30,6 +30,9 @@
import org.apache.qpid.server.store.berkeleydb.utils.BDBVMTestCase;
+/**
+ * Note this test uses QpidTestCase and running 'ant test' does not setup the required properties for this.
+ */
public class MessageReSendTest extends BDBVMTestCase
{
protected static final String MESSAGE_ID_PROPERTY = "MessageIDProperty";
Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/QueueDeleteWhilstRoutingTest.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/QueueDeleteWhilstRoutingTest.java 2009-02-19 19:02:07 UTC (rev 3124)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/QueueDeleteWhilstRoutingTest.java 2009-02-20 13:34:27 UTC (rev 3125)
@@ -37,6 +37,10 @@
import javax.naming.NamingException;
import java.io.File;
+/**
+ * Note this test uses QpidTestCase and running 'ant test' does not setup the required properties for this.
+ */
+
public class QueueDeleteWhilstRoutingTest extends BDBVMTestCase
{
private static final Logger _logger = Logger.getLogger(QueueDeleteWhilstRoutingTest.class);
Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/JNDIHelper.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/JNDIHelper.java 2009-02-19 19:02:07 UTC (rev 3124)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/utils/JNDIHelper.java 2009-02-20 13:34:27 UTC (rev 3125)
@@ -37,7 +37,7 @@
public JNDIHelper(String broker)
{
- CONNECTION_NAME = "amqp://guest:guest@clientid/test?brokerlist='" + broker + "'";
+ CONNECTION_NAME = "amqp://guest:guest@clientid/bdbtest?brokerlist='" + broker + "'";
setupJNDI();
}
Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java 2009-02-19 19:02:07 UTC (rev 3124)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/tools/java/org/apache/qpid/server/store/berkeleydb/BDBStoreUpgrade.java 2009-02-20 13:34:27 UTC (rev 3125)
@@ -26,6 +26,7 @@
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.AMQException;
import org.apache.qpid.util.FileUtils;
import org.apache.commons.cli.PosixParser;
@@ -34,6 +35,7 @@
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.configuration.PropertiesConfiguration;
import java.io.File;
import java.io.BufferedReader;
@@ -327,8 +329,8 @@
}
// Note the name of the Vhosts is not important, the store doesnot record the name of the vhost.
- _newVirtualHost = new VirtualHost("Upgraded", new MemoryMessageStore());
- _oldVirtualHost = new VirtualHost("Old", new MemoryMessageStore());
+ _newVirtualHost = new VirtualHost(new VirtualHostConfiguration("Upgraded", new PropertiesConfiguration()), new MemoryMessageStore());
+ _oldVirtualHost = new VirtualHost(new VirtualHostConfiguration("Old", new PropertiesConfiguration()), new MemoryMessageStore());
//Create a new messageStore
_newMessageStore = new BDBMessageStore();
16 years
rhmessaging commits: r3124 - in store/trunk/cpp: tests and 1 other directories.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2009-02-19 14:02:07 -0500 (Thu, 19 Feb 2009)
New Revision: 3124
Added:
store/trunk/cpp/lib/TxnCtxt.cpp
Modified:
store/trunk/cpp/lib/Makefile.am
store/trunk/cpp/lib/MessageStoreImpl.cpp
store/trunk/cpp/lib/TxnCtxt.h
store/trunk/cpp/tests/SimpleTest.cpp
store/trunk/cpp/tests/jrnl/jtt/jfile_chk.py
Log:
Fix for BZ486418 - "qpidd+store The extra xids encountered after qpidd recovery from journal". Some minor format improvements to the python journal analysis tool.
Modified: store/trunk/cpp/lib/Makefile.am
===================================================================
--- store/trunk/cpp/lib/Makefile.am 2009-02-19 18:57:30 UTC (rev 3123)
+++ store/trunk/cpp/lib/Makefile.am 2009-02-19 19:02:07 UTC (rev 3124)
@@ -49,6 +49,7 @@
MessageStoreImpl.cpp \
PreparedTransaction.cpp \
StringDbt.cpp \
+ TxnCtxt.cpp \
BindingDbt.h \
BufferValue.h \
Cursor.h \
Modified: store/trunk/cpp/lib/MessageStoreImpl.cpp
===================================================================
--- store/trunk/cpp/lib/MessageStoreImpl.cpp 2009-02-19 18:57:30 UTC (rev 3123)
+++ store/trunk/cpp/lib/MessageStoreImpl.cpp 2009-02-19 19:02:07 UTC (rev 3124)
@@ -29,6 +29,7 @@
#include "jrnl/txn_map.hpp"
#include "qpid/log/Statement.h"
#include "qmf/com/redhat/rhm/store/Package.h"
+#include "StoreException.h"
#define MAX_AIO_SLEEPS 1000 // ~1 second
#define AIO_SLEEP_TIME 1000 // 1 milisecond
Added: store/trunk/cpp/lib/TxnCtxt.cpp
===================================================================
--- store/trunk/cpp/lib/TxnCtxt.cpp (rev 0)
+++ store/trunk/cpp/lib/TxnCtxt.cpp 2009-02-19 19:02:07 UTC (rev 3124)
@@ -0,0 +1,162 @@
+#include "TxnCtxt.h"
+
+#include <sstream>
+#include <unistd.h> // ::usleep()
+
+#include "jrnl/jexception.hpp"
+#include "StoreException.h"
+
+namespace mrg {
+namespace msgstore {
+
+void TxnCtxt::completeTxn(bool commit) {
+ sync();
+ for (ipqItr i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
+ commitTxn(static_cast<JournalImpl*>(*i), commit);
+ }
+ impactedQueues.clear();
+ if (preparedXidStorePtr)
+ commitTxn(preparedXidStorePtr, commit);
+}
+
+void TxnCtxt::commitTxn(JournalImpl* jc, bool commit) {
+ if (jc && loggedtx) { /* if using journal */
+ boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
+ dtokp->addRef();
+ dtokp->set_external_rid(true);
+ dtokp->set_rid(loggedtx->next());
+ try {
+ if (commit) {
+ jc->txn_commit(dtokp.get(), getXid());
+ sync();
+ } else {
+ jc->txn_abort(dtokp.get(), getXid());
+ }
+ } catch (const journal::jexception& e) {
+ THROW_STORE_EXCEPTION(std::string("Error commit") + e.what());
+ }
+ }
+}
+
+//static
+uuid_t TxnCtxt::uuid;
+
+// static
+IdSequence TxnCtxt::uuidSeq;
+
+// static
+bool TxnCtxt::staticInit = TxnCtxt::setUuid();
+
+// static
+bool TxnCtxt::setUuid() {
+ ::uuid_generate(uuid);
+ return true;
+}
+
+TxnCtxt::TxnCtxt(IdSequence* _loggedtx) : loggedtx(_loggedtx), dtokp(new DataTokenImpl), preparedXidStorePtr(0), txn(0) {
+ if (loggedtx) {
+// // Human-readable tid: 53 bytes
+// // uuit_t is a char[16]
+// tid.reserve(53);
+// u_int64_t* u1 = (u_int64_t*)uuid;
+// u_int64_t* u2 = (u_int64_t*)(uuid + sizeof(u_int64_t));
+// std::stringstream s;
+// s << "tid:" << std::hex << std::setfill('0') << std::setw(16) << uuidSeq.next() << ":" << std::setw(16) << *u1 << std::setw(16) << *u2;
+// tid.assign(s.str());
+
+ // Binary tid: 24 bytes
+ tid.reserve(24);
+ u_int64_t c = uuidSeq.next();
+ tid.append((char*)&c, sizeof(c));
+ tid.append((char*)&uuid, sizeof(uuid));
+ }
+}
+
+TxnCtxt::TxnCtxt(std::string _tid, IdSequence* _loggedtx) : loggedtx(_loggedtx), dtokp(new DataTokenImpl), preparedXidStorePtr(0), tid(_tid), txn(0) {}
+
+TxnCtxt::~TxnCtxt() { if(txn) abort(); }
+
+#define MAX_SYNC_SLEEPS 5000 // ~1 second
+#define SYNC_SLEEP_TIME 200 // 0.2 ms
+
+void TxnCtxt::sync() {
+ bool allWritten = false;
+ bool firstloop = true;
+ long sleep_cnt = 0L;
+ while (loggedtx && !allWritten) {
+ if (sleep_cnt > MAX_SYNC_SLEEPS) THROW_STORE_EXCEPTION(std::string("Error: timeout waiting for TxnCtxt::sync()"));
+ if (!firstloop) {
+ ::usleep(SYNC_SLEEP_TIME);
+ sleep_cnt++;
+ } // move this into the get events call aiolib..
+ allWritten = true;
+ for (ipqItr i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
+ sync_jrnl(static_cast<JournalImpl*>(*i), firstloop, allWritten);
+ }
+ if (preparedXidStorePtr)
+ sync_jrnl(preparedXidStorePtr, firstloop, allWritten);
+ firstloop = false;
+ }
+}
+
+void TxnCtxt::sync_jrnl(JournalImpl* jc, bool firstloop, bool& allWritten) {
+ try {
+ if (jc && !(jc->is_txn_synced(getXid()))) {
+ if (firstloop)
+ jc->flush();
+ allWritten = false;
+ jc->get_wr_events();
+ }
+ } catch (const journal::jexception& e) {
+ THROW_STORE_EXCEPTION(std::string("Error sync") + e.what());
+ }
+}
+
+void TxnCtxt::begin(DbEnv& env, bool sync) {
+ env.txn_begin(0, &txn, 0);
+ if (sync)
+ globalHolder = AutoScopedLock(new qpid::sys::Mutex::ScopedLock(globalSerialiser));
+}
+
+void TxnCtxt::commit() {
+ if (txn) {
+ txn->commit(0);
+ txn = 0;
+ globalHolder.reset();
+ }
+}
+
+void TxnCtxt::abort(){
+ if (txn) {
+ txn->abort();
+ txn = 0;
+ globalHolder.reset();
+ }
+}
+
+DbTxn* TxnCtxt::get() { return txn; }
+
+bool TxnCtxt::isTPC() { return false; }
+
+const std::string& TxnCtxt::getXid() { return tid; }
+
+void TxnCtxt::addXidRecord(qpid::broker::ExternalQueueStore* queue) { impactedQueues.insert(queue); }
+
+void TxnCtxt::complete(bool commit) { completeTxn(commit); }
+
+bool TxnCtxt::impactedQueuesEmpty() { return impactedQueues.empty(); }
+
+DataTokenImpl* TxnCtxt::getDtok() { return dtokp.get(); }
+
+void TxnCtxt::incrDtokRef() { dtokp->addRef(); }
+
+void TxnCtxt::recoverDtok(const u_int64_t rid, const std::string xid) {
+ dtokp->set_rid(rid);
+ dtokp->set_wstate(DataTokenImpl::ENQ);
+ dtokp->set_xid(xid);
+ dtokp->set_external_rid(true);
+}
+
+TPCTxnCtxt::TPCTxnCtxt(const std::string& _xid, IdSequence* _loggedtx) : TxnCtxt(_loggedtx), xid(_xid) {}
+
+}}
Modified: store/trunk/cpp/lib/TxnCtxt.h
===================================================================
--- store/trunk/cpp/lib/TxnCtxt.h 2009-02-19 18:57:30 UTC (rev 3123)
+++ store/trunk/cpp/lib/TxnCtxt.h 2009-02-19 19:02:07 UTC (rev 3124)
@@ -24,23 +24,19 @@
#ifndef _TxnCtxt_
#define _TxnCtxt_
-#include <boost/format.hpp>
#include <boost/intrusive_ptr.hpp>
#include <db-inc.h>
#include <memory>
#include <set>
-#include <sstream>
#include <string>
-#include <unistd.h> // ::usleep()
-
+
#include "DataTokenImpl.h"
#include "IdSequence.h"
#include "JournalImpl.h"
-#include "jrnl/jexception.hpp"
#include "qpid/broker/PersistableQueue.h"
#include "qpid/broker/TransactionalStore.h"
#include "qpid/sys/Mutex.h"
-#include "StoreException.h"
+#include "qpid/sys/uuid.h"
namespace mrg {
namespace msgstore {
@@ -50,6 +46,11 @@
protected:
static qpid::sys::Mutex globalSerialiser;
+ static uuid_t uuid;
+ static IdSequence uuidSeq;
+ static bool staticInit;
+ static bool setUuid();
+
typedef std::set<qpid::broker::ExternalQueueStore*> ipqdef;
typedef ipqdef::iterator ipqItr;
typedef std::auto_ptr<qpid::sys::Mutex::ScopedLock> AutoScopedLock;
@@ -67,133 +68,45 @@
std::string tid;
DbTxn* txn;
- virtual void completeTxn(bool commit) {
- sync();
- for (ipqItr i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
- commitTxn(static_cast<JournalImpl*>(*i), commit);
- }
- impactedQueues.clear();
- if (preparedXidStorePtr)
- commitTxn(preparedXidStorePtr, commit);
- }
+ virtual void completeTxn(bool commit);
+ void commitTxn(JournalImpl* jc, bool commit);
- void commitTxn(JournalImpl* jc, bool commit) {
- if (jc && loggedtx) { /* if using journal */
- boost::intrusive_ptr<DataTokenImpl> dtokp(new DataTokenImpl);
- dtokp->addRef();
- dtokp->set_external_rid(true);
- dtokp->set_rid(loggedtx->next());
- try {
- if (commit) {
- jc->txn_commit(dtokp.get(), getXid());
- sync();
- } else {
- jc->txn_abort(dtokp.get(), getXid());
- }
- } catch (const journal::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("Error commit") + e.what());
- }
- }
- }
-
public:
+ TxnCtxt(IdSequence* _loggedtx=NULL);
+ TxnCtxt(std::string _tid, IdSequence* _loggedtx);
+ virtual ~TxnCtxt();
- TxnCtxt(IdSequence* _loggedtx=NULL) : loggedtx(_loggedtx), dtokp(new DataTokenImpl), preparedXidStorePtr(0), txn(0) {
- if (loggedtx) {
- std::stringstream s;
- s << "rhm-tid" << this;
- tid.assign(s.str());
- }
- }
-
- TxnCtxt(std::string _tid, IdSequence* _loggedtx) : loggedtx(_loggedtx), dtokp(new DataTokenImpl), preparedXidStorePtr(0), tid(_tid), txn(0) {}
-
/**
* Call to make sure all the data for this txn is written to safe store
*
*@return if the data sucessfully synced.
*/
+ void sync();
+ void sync_jrnl(JournalImpl* jc, bool firstloop, bool& allWritten);
+ void begin(DbEnv& env, bool sync = false);
+ void commit();
+ void abort();
+ DbTxn* get();
+ virtual bool isTPC();
+ virtual const std::string& getXid();
- virtual ~TxnCtxt() { if(txn) abort(); }
-
-#define MAX_SYNC_SLEEPS 5000 // ~1 second
-#define SYNC_SLEEP_TIME 200 // 0.2 ms
-
- void sync() {
- bool allWritten = false;
- bool firstloop = true;
- long sleep_cnt = 0L;
- while (loggedtx && !allWritten) {
- if (sleep_cnt > MAX_SYNC_SLEEPS) THROW_STORE_EXCEPTION(std::string("Error: timeout waiting for TxnCtxt::sync()"));
- if (!firstloop) { ::usleep(SYNC_SLEEP_TIME); sleep_cnt++; } // move this into the get events call aiolib..
- allWritten = true;
- for (ipqItr i = impactedQueues.begin(); i != impactedQueues.end(); i++) {
- sync_jrnl(static_cast<JournalImpl*>(*i), firstloop, allWritten);
- }
- if (preparedXidStorePtr)
- sync_jrnl(preparedXidStorePtr, firstloop, allWritten);
- firstloop = false;
- }
- }
-
- void sync_jrnl(JournalImpl* jc, bool firstloop, bool& allWritten) {
- try {
- if (jc && !(jc->is_txn_synced(getXid()))) {
- if (firstloop) jc->flush();
- allWritten = false;
- jc->get_wr_events();
- }
- } catch (const journal::jexception& e) {
- THROW_STORE_EXCEPTION(std::string("Error sync") + e.what());
- }
- }
-
- void begin(DbEnv& env, bool sync = false) {
- env.txn_begin(0, &txn, 0);
- if (sync) globalHolder = AutoScopedLock(new qpid::sys::Mutex::ScopedLock(globalSerialiser));
- }
-
- void commit() {
- if (txn) {
- txn->commit(0);
- txn = 0;
- globalHolder.reset();
- }
- }
-
- void abort(){
- if (txn) {
- txn->abort();
- txn = 0;
- globalHolder.reset();
- }
- }
-
- DbTxn* get() { return txn; }
- virtual bool isTPC() { return false; }
- virtual const std::string& getXid() { return tid; }
-
- void addXidRecord(qpid::broker::ExternalQueueStore* queue) { impactedQueues.insert(queue); }
+ void addXidRecord(qpid::broker::ExternalQueueStore* queue);
inline void prepare(JournalImpl* _preparedXidStorePtr) { preparedXidStorePtr = _preparedXidStorePtr; }
- void complete(bool commit) { completeTxn(commit); }
- bool impactedQueuesEmpty() { return impactedQueues.empty(); }
- DataTokenImpl* getDtok() { return dtokp.get(); }
- void incrDtokRef() { dtokp->addRef(); }
- void recoverDtok(const u_int64_t rid, const std::string xid) {
- dtokp->set_rid(rid);
- dtokp->set_wstate(DataTokenImpl::ENQ);
- dtokp->set_xid(xid);
- dtokp->set_external_rid(true);
- }
+ void complete(bool commit);
+ bool impactedQueuesEmpty();
+ DataTokenImpl* getDtok();
+ void incrDtokRef();
+ void recoverDtok(const u_int64_t rid, const std::string xid);
};
+
class TPCTxnCtxt : public TxnCtxt, public qpid::broker::TPCTransactionContext
{
protected:
const std::string xid;
public:
- TPCTxnCtxt(const std::string& _xid, IdSequence* _loggedtx) : TxnCtxt(_loggedtx), xid(_xid) {}
+ TPCTxnCtxt(const std::string& _xid, IdSequence* _loggedtx);
inline virtual bool isTPC() { return true; }
inline virtual const std::string& getXid() { return xid; }
};
Modified: store/trunk/cpp/tests/SimpleTest.cpp
===================================================================
--- store/trunk/cpp/tests/SimpleTest.cpp 2009-02-19 18:57:30 UTC (rev 3123)
+++ store/trunk/cpp/tests/SimpleTest.cpp 2009-02-19 19:02:07 UTC (rev 3124)
@@ -26,6 +26,7 @@
#include "MessageStoreImpl.h"
#include <iostream>
#include "MessageUtils.h"
+#include "StoreException.h"
#include <qpid/broker/Queue.h>
#include <qpid/broker/RecoveryManagerImpl.h>
#include <qpid/framing/AMQHeaderBody.h>
Modified: store/trunk/cpp/tests/jrnl/jtt/jfile_chk.py
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/jfile_chk.py 2009-02-19 18:57:30 UTC (rev 3123)
+++ store/trunk/cpp/tests/jrnl/jtt/jfile_chk.py 2009-02-19 19:02:07 UTC (rev 3124)
@@ -48,6 +48,10 @@
transient_mask = 0x10
extern_mask = 0x20
+printchars = '0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~ '
+
+
+
#== global functions ===========================================================
def load(f, klass):
@@ -78,7 +82,7 @@
return f.tell() >= jfsize
def isprintable(s):
- return s.strip(string.printable) == ''
+ return s.strip(printchars) == ''
def print_xid(xidsize, xid):
if xid == None:
@@ -107,6 +111,8 @@
def hex_split_str(s, split_size = 50):
if len(s) <= split_size:
return hex_str(s, 0, len(s))
+ if len(s) > split_size + 25:
+ return hex_str(s, 0, 10) + ' ... ' + hex_str(s, 55, 65) + ' ... ' + hex_str(s, len(s)-10, len(s))
return hex_str(s, 0, 10) + ' ... ' + hex_str(s, len(s)-10, len(s))
def hex_str(s, b, e):
@@ -118,11 +124,10 @@
o += '\\%02x' % ord(s[i])
return o
-def split_str(s):
- if len(s) > 25:
- return s[:12] + ' ... ' + s[-10:]
- else:
+def split_str(s, split_size = 50):
+ if len(s) < split_size:
return s
+ return s[:25] + ' ... ' + s[-25:]
def inv_str(s):
si = ''
@@ -587,7 +592,7 @@
if len(mismatched_rids) > 0:
warn = ' (WARNING: transactional dequeues not found in enqueue map; rids=%s)' % mismatched_rids
else:
- warn = ' (WARNING: xid %s not found in transaction map)' % hdr.xid
+ warn = ' (WARNING: %s not found in transaction map)' % print_xid(len(hdr.xid), hdr.xid)
if not self.qflag: print ' > %s%s' % (hdr, warn)
if not stop:
stop = (self.last_file and hdr.check()) or hdr.empty() or self.fhdr.empty()
@@ -808,7 +813,7 @@
print
print 'Remaining transactions: '
for t in self.tmap:
- print "xid=%s:" % t
+ print_xid(len(t), t)
for r in self.tmap[t]:
print " fid=%d %s" % (r[0], r[1])
print " Total: %d records for xid %s" % (len(self.tmap[t]), t)
16 years
rhmessaging commits: r3123 - mgmt/trunk/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2009-02-19 13:57:30 -0500 (Thu, 19 Feb 2009)
New Revision: 3123
Modified:
mgmt/trunk/cumin/python/cumin/tools.py
Log:
Drop root privileges
Modified: mgmt/trunk/cumin/python/cumin/tools.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/tools.py 2009-02-18 15:14:54 UTC (rev 3122)
+++ mgmt/trunk/cumin/python/cumin/tools.py 2009-02-19 18:57:30 UTC (rev 3123)
@@ -57,6 +57,10 @@
def init(self):
super(BaseCuminTool, self).init()
+ if os.getuid() == 0:
+ # Drop privileges
+ os.setuid(os.stat(__file__).st_uid)
+
self.config.init()
try:
16 years
rhmessaging commits: r3122 - store/trunk/cpp/tests/jrnl/jtt.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2009-02-18 10:14:54 -0500 (Wed, 18 Feb 2009)
New Revision: 3122
Modified:
store/trunk/cpp/tests/jrnl/jtt/jfile_chk.py
Log:
Fix for jfile_chk.py tool which has a bug in the abort handling path
Modified: store/trunk/cpp/tests/jrnl/jtt/jfile_chk.py
===================================================================
--- store/trunk/cpp/tests/jrnl/jtt/jfile_chk.py 2009-02-17 16:29:27 UTC (rev 3121)
+++ store/trunk/cpp/tests/jrnl/jtt/jfile_chk.py 2009-02-18 15:14:54 UTC (rev 3122)
@@ -580,7 +580,7 @@
elif hdr.magic[-1] == 'a': # abort
for rec in self.tmap[hdr.xid]:
if isinstance(rec[1], DeqHdr):
- if self.emap[rec[1].deq_rid] != None:
+ if rec[1].deq_rid in self.emap:
t = self.emap[rec[1].deq_rid]
self.emap[rec[1].deq_rid] = (t[0], t[1], False) # Unlock enq record
del self.tmap[hdr.xid]
16 years
rhmessaging commits: r3121 - mgmt/trunk/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: eallen
Date: 2009-02-17 11:29:27 -0500 (Tue, 17 Feb 2009)
New Revision: 3121
Modified:
mgmt/trunk/cumin/python/cumin/brokerlink.py
mgmt/trunk/cumin/python/cumin/model.py
Log:
Added new sync argument to link.bridge method and defaulted it to 0
Modified: mgmt/trunk/cumin/python/cumin/brokerlink.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/brokerlink.py 2009-02-16 19:03:14 UTC (rev 3120)
+++ mgmt/trunk/cumin/python/cumin/brokerlink.py 2009-02-17 16:29:27 UTC (rev 3121)
@@ -274,7 +274,8 @@
class PeerStatus(CuminStatus):
def render_color(self, session, link):
- return link.statsCurr.lastError and "red" or "green"
+ if link.statsCurr:
+ return link.statsCurr.lastError and "red" or "green"
def render_peer_state(self, session, peer):
return peer.statsCurr.state
@@ -417,7 +418,10 @@
self.dynamic = self.DynamicField(app, "dynamic")
self.more.add_field(self.dynamic)
-
+
+ self.sync = self.SyncField(app, "sync")
+ self.more.add_field(self.sync)
+
def get_args(self, session):
return self.frame.get_args(session)
@@ -431,6 +435,19 @@
if not self.excludes.get(session):
self.excludes.set(session, "%s:%s" % (args[0].host, args[0].port))
+ if not self.sync.get(session):
+ self.sync.set(session, self.sync.get_default(session))
+
+ class SyncField(IntegerField):
+ def render_title(self, session):
+ return "Ack"
+
+ def render_field_help(self, session):
+ return "Acknowledge transfers over the bridge in batches of N"
+
+ def get_default(self, session):
+ return 0
+
class DynamicField(TwoOptionRadioField):
def render_title(self, session):
return "Dynamic Route?"
@@ -466,13 +483,15 @@
exchange = Exchange.get(int(exchange_id))
durable = link.durable
dynamic = self.dynamic.get(session) == "yes"
-
+ sync = self.sync.get(session)
+
args = {"durable": durable,
"exchange": exchange.name,
"key": key,
"tag": tag,
"excludes": excludes,
- "dynamic": dynamic
+ "dynamic": dynamic,
+ "sync": sync
}
action = self.app.model.link.bridge
Modified: mgmt/trunk/cumin/python/cumin/model.py
===================================================================
--- mgmt/trunk/cumin/python/cumin/model.py 2009-02-16 19:03:14 UTC (rev 3120)
+++ mgmt/trunk/cumin/python/cumin/model.py 2009-02-17 16:29:27 UTC (rev 3121)
@@ -1632,11 +1632,12 @@
tag = args["tag"]
dynamic = args["dynamic"]
excludes = args["excludes"]
+ sync = args["sync"]
link.bridge(self.model.data, completion,
durable, src, dest, key,
- tag, excludes, False, False, dynamic)
-
+ tag, excludes, False, False, dynamic, sync)
+
class Close(CuminAction):
def show(self, session, link):
frame = self.cumin_class.show_object(session, link)
16 years
rhmessaging commits: r3120 - mgmt/trunk/mint/python/mint.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2009-02-16 14:03:14 -0500 (Mon, 16 Feb 2009)
New Revision: 3120
Modified:
mgmt/trunk/mint/python/mint/sql.py
Log:
Expire jobs based on delete time, not update time
Modified: mgmt/trunk/mint/python/mint/sql.py
===================================================================
--- mgmt/trunk/mint/python/mint/sql.py 2009-02-16 09:20:11 UTC (rev 3119)
+++ mgmt/trunk/mint/python/mint/sql.py 2009-02-16 19:03:14 UTC (rev 3120)
@@ -151,6 +151,7 @@
def generate(self):
table = self.cls.sqlmeta.table
+
if table.endswith("_stats"):
parent_table = table[0:table.find("_stats")]
sql = """
@@ -159,11 +160,12 @@
""" % (table)
if self.keepCurrStats:
sql += " and id not in (select stats_curr_id from %s)" % (parent_table)
- else:
+ elif self.cls is mint.Job:
sql = """
- delete from %s
- where qmf_create_time < now() - interval '%%(threshold)s seconds'
+ delete from %s
+ where qmf_delete_time < now() - interval '%%(threshold)s seconds'
""" % (table)
+
return sql
class SqlGetBrokerRegistration(SqlOperation):
16 years
rhmessaging commits: r3119 - in store/branches/java/broker-queue-refactor/java/bdbstore/src: test/java/org/apache/qpid/server/store/berkeleydb and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: ritchiem
Date: 2009-02-16 04:20:11 -0500 (Mon, 16 Feb 2009)
New Revision: 3119
Modified:
store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java
Log:
Update of BDBStore to use the new TransactionalLog
Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2009-02-13 15:13:27 UTC (rev 3118)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStore.java 2009-02-16 09:20:11 UTC (rev 3119)
@@ -49,8 +49,6 @@
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.queue.PersistentAMQMessage;
import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.queue.TransientAMQMessage;
-import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.store.berkeleydb.tuples.BindingTupleBindingFactory;
import org.apache.qpid.server.store.berkeleydb.tuples.QueueTuple;
@@ -58,6 +56,8 @@
import org.apache.qpid.server.txn.NonTransactionalContext;
import org.apache.qpid.server.txn.TransactionalContext;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.transactionlog.TransactionLog;
+import org.apache.qpid.server.routing.RoutingTable;
import java.io.File;
import java.util.ArrayList;
@@ -74,14 +74,14 @@
import java.util.concurrent.atomic.AtomicReference;
/**
- * BDBMessageStore implements a persistent {@link MessageStore} using the BDB high performance log.
+ * BDBMessageStore implements a persistent {@link TransactionLog} using the BDB high performance log.
*
* <p/><table id="crc"><caption>CRC Card</caption> <tr><th> Responsibilities <th> Collaborations <tr><td> Accept
* transaction boundary demarcations: Begin, Commit, Abort. <tr><td> Store and remove queues. <tr><td> Store and remove
* exchanges. <tr><td> Store and remove messages. <tr><td> Bind and unbind queues to exchanges. <tr><td> Enqueue and
* dequeue messages to queues. <tr><td> Generate message identifiers. </table>
*/
-public class BDBMessageStore implements MessageStore
+public class BDBMessageStore implements TransactionLog, RoutingTable
{
private static final Logger _log = Logger.getLogger(BDBMessageStore.class);
Modified: store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java
===================================================================
--- store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java 2009-02-13 15:13:27 UTC (rev 3118)
+++ store/branches/java/broker-queue-refactor/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBStoreTest.java 2009-02-16 09:20:11 UTC (rev 3119)
@@ -104,7 +104,7 @@
env.addProperty("store.class", "org.apache.qpid.server.store.berkeleydb.BDBMessageStore");
_virtualHost = new VirtualHost("test", env);
- _store = (BDBMessageStore) _virtualHost.getMessageStore();
+ _store = (BDBMessageStore) _virtualHost.getTransactionLog();
}
public void tearDown() throws Exception
16 years