rhmessaging commits: r1018 - store/trunk/cpp/lib/jrnl.
by rhmessaging-commits@lists.jboss.org
Author: cctrieloff
Date: 2007-10-12 09:46:08 -0400 (Fri, 12 Oct 2007)
New Revision: 1018
Modified:
store/trunk/cpp/lib/jrnl/enq_map.cpp
store/trunk/cpp/lib/jrnl/enq_map.hpp
store/trunk/cpp/lib/jrnl/wmgr.cpp
Log:
correct logic for rid to be used in locked map for dequeue
Modified: store/trunk/cpp/lib/jrnl/enq_map.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_map.cpp 2007-10-12 13:43:02 UTC (rev 1017)
+++ store/trunk/cpp/lib/jrnl/enq_map.cpp 2007-10-12 13:46:08 UTC (rev 1018)
@@ -95,7 +95,7 @@
}
const u_int16_t
-enq_map::get_remove_fid(const u_int64_t rid) throw (jexception)
+enq_map::get_remove_fid(const u_int64_t rid, const bool tx_flag) throw (jexception)
{
pthread_mutex_lock(&_mutex);
emap_itr itr = _map.find(rid);
@@ -106,7 +106,7 @@
ss << std::hex << "rid=0x" << rid;
throw jexception(jerrno::JERR_MAP_NOTFOUND, ss.str(), "enq_map", "get_remove_fid");
}
- if (itr->second.second) // locked
+ if (itr->second.second && !tx_flag) // locked
{
pthread_mutex_unlock(&_mutex);
std::stringstream ss;
Modified: store/trunk/cpp/lib/jrnl/enq_map.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_map.hpp 2007-10-12 13:43:02 UTC (rev 1017)
+++ store/trunk/cpp/lib/jrnl/enq_map.hpp 2007-10-12 13:46:08 UTC (rev 1018)
@@ -74,7 +74,7 @@
void insert_fid(const u_int64_t rid, const u_int16_t fid) throw (jexception);
void insert_fid(const u_int64_t rid, const u_int16_t fid, bool locked) throw (jexception);
const u_int16_t get_fid(const u_int64_t rid) throw (jexception);
- const u_int16_t get_remove_fid(const u_int64_t rid) throw (jexception);
+ const u_int16_t get_remove_fid(const u_int64_t rid, const bool tx_flag=false) throw (jexception);
void lock(const u_int64_t rid) throw (jexception);
void unlock(const u_int64_t rid) throw (jexception);
const bool is_locked(const u_int64_t rid) throw (jexception);
Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-10-12 13:43:02 UTC (rev 1017)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-10-12 13:46:08 UTC (rev 1018)
@@ -569,7 +569,7 @@
_emap.insert_fid(itr->_rid, itr->_fid);
else // txn dequeue
{
- u_int16_t fid = _emap.get_remove_fid(dtokp->rid());
+ u_int16_t fid = _emap.get_remove_fid(itr->_rid, true);
_wrfc.decr_enqcnt(fid);
}
}
18 years, 6 months
rhmessaging commits: r1017 - in mgmt: notes and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2007-10-12 09:43:02 -0400 (Fri, 12 Oct 2007)
New Revision: 1017
Modified:
mgmt/cumin/python/cumin/page.strings
mgmt/cumin/python/cumin/server.strings
mgmt/notes/Todo
Log:
Adds a simple comma separated list style for ULs and uses it for
server groups in the server properties box.
Modified: mgmt/cumin/python/cumin/page.strings
===================================================================
--- mgmt/cumin/python/cumin/page.strings 2007-10-12 13:30:11 UTC (rev 1016)
+++ mgmt/cumin/python/cumin/page.strings 2007-10-12 13:43:02 UTC (rev 1017)
@@ -341,6 +341,22 @@
border: 1px dotted #ddd;
}
+ul.comma {
+ list-style: none;
+}
+
+ul.comma li {
+ display: inline;
+}
+
+ul.comma li:after {
+ content: ", "
+}
+
+ul.comma li:last-child:after {
+ content: ""
+}
+
[CuminPage.html]
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN" "http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">
Modified: mgmt/cumin/python/cumin/server.strings
===================================================================
--- mgmt/cumin/python/cumin/server.strings 2007-10-12 13:30:11 UTC (rev 1016)
+++ mgmt/cumin/python/cumin/server.strings 2007-10-12 13:43:02 UTC (rev 1017)
@@ -38,6 +38,9 @@
{tabs}
</div>
+[ServerServerGroups.html]
+<ul class="comma">{items}</ul>
+
[ConfigTab.html]
<ul class="actions">
<li><a href="">Edit This Configuration</a></li>
Modified: mgmt/notes/Todo
===================================================================
--- mgmt/notes/Todo 2007-10-12 13:30:11 UTC (rev 1016)
+++ mgmt/notes/Todo 2007-10-12 13:43:02 UTC (rev 1017)
@@ -61,11 +61,11 @@
* Add creation dates to some objects
- * cumindev: Bind .strings to html-mode
-
* cumindev: add a cumin-test function and bind it to C-c C-c
* Consider adding a set_object to Frame, instead of having
set_somethingspecific on each frame.
* Remove log messages from host template view
+
+ * Consider having a cssclass set on widgets
18 years, 6 months
rhmessaging commits: r1016 - mgmt/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2007-10-12 09:30:11 -0400 (Fri, 12 Oct 2007)
New Revision: 1016
Modified:
mgmt/cumin/python/cumin/exchange.py
mgmt/cumin/python/cumin/exchange.strings
Log:
Makes the exchange view read only.
Modified: mgmt/cumin/python/cumin/exchange.py
===================================================================
--- mgmt/cumin/python/cumin/exchange.py 2007-10-12 13:14:51 UTC (rev 1015)
+++ mgmt/cumin/python/cumin/exchange.py 2007-10-12 13:30:11 UTC (rev 1016)
@@ -66,24 +66,12 @@
self.view = ExchangeView(app, "view")
self.add_child(self.view)
- self.edit = ExchangeEdit(app, "edit")
- self.add_child(self.edit)
-
- self.remove = ExchangeRemove(app, "remove")
- self.add_child(self.remove)
-
def set_exchange(self, session, exchange):
return self.param.set(session, exchange)
def show_view(self, session):
return self.show_mode(session, self.view)
- def show_edit(self, session):
- return self.show_mode(session, self.edit)
-
- def show_remove(self, session):
- return self.show_mode(session, self.remove)
-
def render_href(self, session, exchange):
branch = session.branch()
self.show_view(branch)
@@ -122,24 +110,10 @@
else:
raise Exception()
- def render_edit_exchange_href(self, session, exchange):
- branch = session.branch()
- self.page().show_exchange(branch, exchange).show_edit(branch)
- return branch.marshal()
-
- def render_remove_exchange_href(self, session, exchange):
- branch = session.branch()
- self.page().show_exchange(branch, exchange).show_remove(branch)
- return branch.marshal()
-
class ExchangeBindingSet(ItemSet):
def render_title(self, session, exchange):
return "Bindings (%i)" % len(exchange.binding_items())
- def render_add_binding_href(self, session, exchange):
- branch = session.branch()
- return branch.marshal()
-
def get_items(self, session, exchange):
return sorted(exchange.binding_items(), cmp,
lambda x: x.get_queue().name)
Modified: mgmt/cumin/python/cumin/exchange.strings
===================================================================
--- mgmt/cumin/python/cumin/exchange.strings 2007-10-12 13:14:51 UTC (rev 1015)
+++ mgmt/cumin/python/cumin/exchange.strings 2007-10-12 13:30:11 UTC (rev 1016)
@@ -18,7 +18,8 @@
<th>Configuration</th>
<th>Status</th>
</tr>
-{items}
+
+ {items}
</table>
[ExchangeSet.item_html]
@@ -38,6 +39,7 @@
<fieldset>
<div class="field">{exchange_name}</div>
</fieldset>
+
<span class="legend">Type</span>
<fieldset>
<div class="field">
@@ -53,12 +55,13 @@
<em>Fan Out:</em> Lorem ipsum gloria dei ipso facto ad nauseum
</div>
</fieldset>
-{hidden_inputs}
+
+ {hidden_inputs}
</div>
<div class="foot">
<div style="display: block; float: left;"><button>Help</help></div>
-{cancel}
-{submit}
+ {cancel}
+ {submit}
</div>
</form>
<script defer="defer">
@@ -79,24 +82,14 @@
<dt>Type</dt><dd>{type}</dd>
</dl>
- <ul class="actions">
- <li><a href="{edit_exchange_href}">Edit This Exchange</a></li>
- <li><a href="{remove_exchange_href}">Remove This Exchange</a></li>
- </ul>
-
{tabs}
</div>
[ExchangeBindingSet.html]
-<ul class="actions">
- <li><a href="{add_binding_href}">Add Binding</a></li>
-</ul>
-
<table class="ExchangeBindingSet mobjects">
<tr>
<th>Queue</th>
<th>Routing Key</th>
- <th></th>
</tr>
{items}
@@ -106,5 +99,4 @@
<tr>
<td><a href="{item_href}">Queue '{item_name}'</a></td>
<td>{item_routing_key}</td>
- <td><a class="action" href="">Remove</a></td>
</tr>
18 years, 6 months
rhmessaging commits: r1015 - mgmt/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2007-10-12 09:14:51 -0400 (Fri, 12 Oct 2007)
New Revision: 1015
Modified:
mgmt/cumin/python/cumin/queue.py
mgmt/cumin/python/cumin/queue.strings
Log:
Makes the queue view read only.
Modified: mgmt/cumin/python/cumin/queue.py
===================================================================
--- mgmt/cumin/python/cumin/queue.py 2007-10-12 11:28:13 UTC (rev 1014)
+++ mgmt/cumin/python/cumin/queue.py 2007-10-12 13:14:51 UTC (rev 1015)
@@ -69,30 +69,12 @@
self.view = QueueView(app, "view")
self.add_child(self.view)
- self.edit = QueueEdit(app, "edit")
- self.add_child(self.edit)
-
- self.remove = QueueRemove(app, "remove")
- self.add_child(self.remove)
-
- self.binding_add = QueueBindingAdd(app, "binding_add")
- self.add_child(self.binding_add)
-
def set_queue(self, session, queue):
return self.param.set(session, queue)
def show_view(self, session):
return self.show_mode(session, self.view)
- def show_edit(self, session):
- return self.show_mode(session, self.edit)
-
- def show_remove(self, session):
- return self.show_mode(session, self.remove)
-
- def show_binding_add(self, session):
- return self.show_mode(session, self.binding_add)
-
def render_href(self, session, queue):
branch = session.branch()
self.show_view(branch)
@@ -158,25 +140,10 @@
else:
raise Exception()
- def render_edit_queue_href(self, session, queue):
- branch = session.branch()
- self.page().show_queue(branch, queue).show_edit(branch)
- return branch.marshal()
-
- def render_remove_queue_href(self, session, queue):
- branch = session.branch()
- self.page().show_queue(branch, queue).show_remove(branch)
- return branch.marshal()
-
class QueueBindingSet(ItemSet):
def render_title(self, session, queue):
return "Bindings (%i)" % len(queue.binding_items())
- def render_add_binding_href(self, session, queue):
- branch = session.branch()
- self.page().show_queue(branch, queue).show_binding_add(branch)
- return branch.marshal()
-
def get_items(self, session, queue):
return sorted(queue.binding_items(), cmp,
lambda x: x.get_exchange().name)
Modified: mgmt/cumin/python/cumin/queue.strings
===================================================================
--- mgmt/cumin/python/cumin/queue.strings 2007-10-12 11:28:13 UTC (rev 1014)
+++ mgmt/cumin/python/cumin/queue.strings 2007-10-12 13:14:51 UTC (rev 1015)
@@ -33,6 +33,7 @@
<fieldset>
<div class="field">{queue_name}</div>
</fieldset>
+
<span class="legend">Latency Tuning</span>
<fieldset>
<div class="field">
@@ -49,12 +50,13 @@
delays
</div>
</fieldset>
-{hidden_inputs}
+
+ {hidden_inputs}
</div>
<div class="foot">
<div style="display: block; float: left;"><button>Help</button></div>
-{cancel}
-{submit}
+ {cancel}
+ {submit}
</div>
</form>
<script defer="defer">
@@ -114,24 +116,14 @@
<dt>Latency Tuning</dt><dd>{latency_tuning}</dd>
</dl>
- <ul class="actions">
- <li><a href="{edit_queue_href}">Edit This Queue</a></li>
- <li><a href="{remove_queue_href}">Remove This Queue</a></li>
- </ul>
-
{tabs}
</div>
[QueueBindingSet.html]
- <ul class="actions">
- <li><a href="{add_binding_href}">Add Binding</a></li>
- </ul>
-
<table class="QueueBindingSet mobjects">
<tr>
<th>Exchange</th>
<th>Routing Key</th>
- <th></th>
</tr>
{items}
@@ -141,7 +133,6 @@
<tr>
<td><a href="{item_href}">Exchange '{item_name}'</a></td>
<td>{item_routing_key}</td>
- <td><a class="action" href="{item_remove_href}">Remove</a></td>
</tr>
[QueueBindingAdd.html]
@@ -152,16 +143,18 @@
<div class="body">
<span class="legend">Exchange</span>
<fieldset>{exchanges}</fieldset>
+
<span class="legend">Routing Key</span>
<fieldset>
<div class="field">{routing_key}</div>
</fieldset>
-{hidden_inputs}
+
+ {hidden_inputs}
</div>
<div class="foot">
<div style="display: block; float: left;"><button>Help</button></div>
-{cancel}
-{submit}
+ {cancel}
+ {submit}
</div>
</form>
<script defer="defer">
18 years, 6 months
rhmessaging commits: r1014 - store/trunk/cpp/tests/jrnl.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2007-10-12 07:28:13 -0400 (Fri, 12 Oct 2007)
New Revision: 1014
Modified:
store/trunk/cpp/tests/jrnl/tests.ods
store/trunk/cpp/tests/jrnl/wtests.csv
Log:
Corrections to write pipeline test cases
Modified: store/trunk/cpp/tests/jrnl/tests.ods
===================================================================
(Binary files differ)
Modified: store/trunk/cpp/tests/jrnl/wtests.csv
===================================================================
--- store/trunk/cpp/tests/jrnl/wtests.csv 2007-10-11 21:47:40 UTC (rev 1013)
+++ store/trunk/cpp/tests/jrnl/wtests.csv 2007-10-12 11:28:13 UTC (rev 1014)
@@ -244,34 +244,34 @@
219,"M",1,5000000,0,5000000,0,0,84,0,0,FALSE,FALSE,128,1,0,0,0,0,"1 dblk max"
220,"M",3,3000000,0,3000000,0,0,340,0,0,FALSE,FALSE,384,3,0,0,0,0,"3 dblks max"
221,"M",10,1600000,0,1600000,0,0,1236,0,0,FALSE,FALSE,1280,10,0,0,0,0,"10 dblks max"
-222,"M",30,6000000,0,6000000,0,0,3796,0,0,FALSE,FALSE,3840,30,0,0,0,0,"30 dblks max"
+222,"M",30,600000,0,600000,0,0,3796,0,0,FALSE,FALSE,3840,30,0,0,0,0,"30 dblks max"
223,"M",100,200000,0,200000,0,0,12756,0,0,FALSE,FALSE,12800,100,0,0,0,0,"100 dblks max"
224,"M",300,60000,0,60000,0,0,38356,0,0,FALSE,FALSE,38400,300,0,0,0,0,"300 dblks max"
225,"M",1000,20000,0,20000,0,0,127956,0,0,FALSE,FALSE,128000,1000,0,0,0,0,"1000 dblks max"
226,"M",1,5000000,0,5000000,0,0,100,1,100,FALSE,FALSE,244,2,0,0,0,0,"100 bytes xid max + 100 bytes data max [txn]"
227,"M",3,3000000,0,3000000,0,0,300,1,300,FALSE,FALSE,644,6,0,0,0,0,"300 bytes xid max + 300 bytes data max [txn]"
228,"M",10,1600000,0,1600000,0,0,1000,1,1000,FALSE,FALSE,2044,16,0,0,0,0,"1000 bytes xid max + 1000 bytes data max [txn]"
-229,"M",30,6000000,0,6000000,0,0,3000,1,3000,FALSE,FALSE,6044,48,0,0,0,0,"3000 bytes xid max + 3000 bytes data max [txn]"
+229,"M",30,600000,0,600000,0,0,3000,1,3000,FALSE,FALSE,6044,48,0,0,0,0,"3000 bytes xid max + 3000 bytes data max [txn]"
230,"M",100,200000,0,200000,0,0,10000,1,10000,FALSE,FALSE,20044,157,0,0,0,0,"10000 bytes xid max + 10000 bytes data max [txn]"
231,"M",300,60000,0,60000,0,0,30000,1,30000,FALSE,FALSE,60044,470,0,0,0,0,"30000 bytes xid max + 30000 bytes data max [txn]"
232,"M",1000,20000,0,20000,0,0,100000,1,100000,FALSE,FALSE,200044,1563,0,0,0,0,"100000 bytes xid max + 100000 bytes data max [txn]"
233,"M",1,5000000,0,5000000,0,0,84,0,0,TRUE,FALSE,128,1,32,1,0,0,"1 dblk max [deq]"
234,"M",3,3000000,0,3000000,0,0,340,0,0,TRUE,FALSE,384,3,32,1,0,0,"3 dblks max [deq]"
235,"M",10,1600000,0,1600000,0,0,1236,0,0,TRUE,FALSE,1280,10,32,1,0,0,"10 dblks max [deq]"
-236,"M",30,6000000,0,6000000,0,0,3796,0,0,TRUE,FALSE,3840,30,32,1,0,0,"30 dblks max [deq]"
+236,"M",30,600000,0,600000,0,0,3796,0,0,TRUE,FALSE,3840,30,32,1,0,0,"30 dblks max [deq]"
237,"M",100,200000,0,200000,0,0,12756,0,0,TRUE,FALSE,12800,100,32,1,0,0,"100 dblks max [deq]"
238,"M",300,60000,0,60000,0,0,38356,0,0,TRUE,FALSE,38400,300,32,1,0,0,"300 dblks max [deq]"
239,"M",1000,20000,0,20000,0,0,127956,0,0,TRUE,FALSE,128000,1000,32,1,0,0,"1000 dblks max [deq]"
240,"M",1,5000000,0,5000000,0,0,100,1,100,TRUE,FALSE,244,2,144,2,136,2,"100 bytes xid max + 100 bytes data max [deq txn]"
241,"M",3,3000000,0,3000000,0,0,300,1,300,TRUE,FALSE,644,6,344,3,336,3,"300 bytes xid max + 300 bytes data max [deq txn]"
242,"M",10,1600000,0,1600000,0,0,1000,1,1000,TRUE,FALSE,2044,16,1044,9,1036,9,"1000 bytes xid max + 1000 bytes data max [deq txn]"
-243,"M",30,6000000,0,6000000,0,0,3000,1,3000,TRUE,FALSE,6044,48,3044,24,3036,24,"3000 bytes xid max + 3000 bytes data max [deq txn]"
+243,"M",30,600000,0,600000,0,0,3000,1,3000,TRUE,FALSE,6044,48,3044,24,3036,24,"3000 bytes xid max + 3000 bytes data max [deq txn]"
244,"M",100,200000,0,200000,0,0,10000,1,10000,TRUE,FALSE,20044,157,10044,79,10036,79,"10000 bytes xid max + 10000 bytes data max [deq txn]"
245,"M",300,60000,0,60000,0,0,30000,1,30000,TRUE,FALSE,60044,470,30044,235,30036,235,"30000 bytes xid max + 30000 bytes data max [deq txn]"
246,"M",1000,20000,0,20000,0,0,100000,1,100000,TRUE,FALSE,200044,1563,100044,782,100036,782,"100000 bytes xid max + 100000 bytes data max [deq txn]"
,,,,,,,,,,,,,,,,,,,
"STANDARD PERFORMANCE BENCHMARK: 10,000,000 writes, data=212b (2 dblks)",,,,,,,,,,,,,,,,,,,
247,"M",1,10000000,0,10000000,0,212,212,0,0,FALSE,FALSE,256,2,0,0,0,0,"212 bytes data (2 dblks enq)"
-248,"M",1,10000000,0,10000000,0,212,212,256,256,FALSE,FALSE,512,4,0,0,0,0,"212 bytes data + 256 bytes xid (4 dblks enq)"
+248,"M",1,10000000,0,10000000,0,404,404,64,64,FALSE,FALSE,512,4,0,0,0,0,"212 bytes data + 64 bytes xid (3 dblks enq)"
249,"M",1,10000000,0,10000000,0,212,212,0,0,TRUE,FALSE,256,2,32,1,0,0,"212 bytes data (2 dblks enq + 1 dblk deq)"
-250,"M",1,10000000,0,10000000,0,212,212,256,256,TRUE,FALSE,512,4,300,3,292,3,"212 bytes data + 256 bytes xid (4 dblks enq + 3 dblks deq + 3 dblks txn)"
+250,"M",1,10000000,0,10000000,0,404,404,64,64,TRUE,FALSE,512,4,108,1,100,1,"212 bytes data + 64 bytes xid (3 dblks enq + 1 dblks deq + 1 dblks txn)"
18 years, 6 months
rhmessaging commits: r1013 - store/trunk/cpp/lib.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2007-10-11 17:47:40 -0400 (Thu, 11 Oct 2007)
New Revision: 1013
Modified:
store/trunk/cpp/lib/BdbMessageStore.h
Log:
Oops, left journal switch on my mistake, now off again
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2007-10-11 21:46:52 UTC (rev 1012)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2007-10-11 21:47:40 UTC (rev 1013)
@@ -113,7 +113,7 @@
void createJrnlQueue(const qpid::broker::PersistableQueue& queue);
string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for exmaple /var/rhm/ + queueDir/
string getJrnlDir(const char* queueName);
- static inline bool usingJrnl() {return true;} // make configurable
+ static inline bool usingJrnl() {return false;} // make configurable
string getJrnlBaseDir();
18 years, 6 months
rhmessaging commits: r1012 - in store/trunk/cpp/lib: jrnl and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: kpvdr
Date: 2007-10-11 17:46:52 -0400 (Thu, 11 Oct 2007)
New Revision: 1012
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
store/trunk/cpp/lib/BdbMessageStore.h
store/trunk/cpp/lib/JournalImpl.cpp
store/trunk/cpp/lib/JournalImpl.h
store/trunk/cpp/lib/jrnl/deq_rec.cpp
store/trunk/cpp/lib/jrnl/deq_rec.hpp
store/trunk/cpp/lib/jrnl/enq_rec.cpp
store/trunk/cpp/lib/jrnl/enq_rec.hpp
store/trunk/cpp/lib/jrnl/jcntl.cpp
store/trunk/cpp/lib/jrnl/jerrno.cpp
store/trunk/cpp/lib/jrnl/jrec.hpp
store/trunk/cpp/lib/jrnl/txn_map.cpp
store/trunk/cpp/lib/jrnl/txn_map.hpp
store/trunk/cpp/lib/jrnl/txn_rec.cpp
store/trunk/cpp/lib/jrnl/txn_rec.hpp
store/trunk/cpp/lib/jrnl/wmgr.cpp
Log:
Added is_synched() calls, other tidy-ups and bugfixes
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-10-11 21:09:42 UTC (rev 1011)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-10-11 21:46:52 UTC (rev 1012)
@@ -304,7 +304,7 @@
try
{
- jQueue->recover(prepared); // start recovery
+ jQueue->recover(prepared, key.id); // start recovery
recoverMessages(txn, registry, queue, prepared, messages);
jQueue->recover_complete(); // start journal.
} catch (journal::jexception& e) {
@@ -540,11 +540,11 @@
std::set<string> prepared;
collectPreparedXids(prepared);
- txn_lock_map enqueues;
- txn_lock_map dequeues;
//when using the async journal, it will abort unprepaired xids and populate the locked maps
if (!usingJrnl()){
- std::set<string> known;
+ txn_lock_map enqueues;
+ txn_lock_map dequeues;
+ std::set<string> known;
readXids(enqueueXidDb, known);
readXids(dequeueXidDb, known);
@@ -557,12 +557,19 @@
}
readLockedMappings(enqueueXidDb, enqueues);
readLockedMappings(dequeueXidDb, dequeues);
- }
-
- for (std::set<string>::iterator i = prepared.begin(); i != prepared.end(); i++) {
- txns.push_back(new PreparedTransaction(*i, enqueues[*i], dequeues[*i]));
- }
-
+ for (std::set<string>::iterator i = prepared.begin(); i != prepared.end(); i++) {
+ txns.push_back(new PreparedTransaction(*i, enqueues[*i], dequeues[*i]));
+ }
+ } else {
+ for (std::set<string>::iterator i = prepared.begin(); i != prepared.end(); i++) {
+ LockedMappings::shared_ptr enq_ptr;
+ enq_ptr.reset(new LockedMappings);
+ LockedMappings::shared_ptr deq_ptr;
+ deq_ptr.reset(new LockedMappings);
+ txns.push_back(new PreparedTransaction(*i, enq_ptr, deq_ptr));
+ }
+
+ }
}
void BdbMessageStore::readXids(Db& db, std::set<string>& xids)
Modified: store/trunk/cpp/lib/BdbMessageStore.h
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.h 2007-10-11 21:09:42 UTC (rev 1011)
+++ store/trunk/cpp/lib/BdbMessageStore.h 2007-10-11 21:46:52 UTC (rev 1012)
@@ -113,7 +113,7 @@
void createJrnlQueue(const qpid::broker::PersistableQueue& queue);
string getJrnlDir(const qpid::broker::PersistableQueue& queue); //for exmaple /var/rhm/ + queueDir/
string getJrnlDir(const char* queueName);
- static inline bool usingJrnl() {return false;} // make configurable
+ static inline bool usingJrnl() {return true;} // make configurable
string getJrnlBaseDir();
Modified: store/trunk/cpp/lib/JournalImpl.cpp
===================================================================
--- store/trunk/cpp/lib/JournalImpl.cpp 2007-10-11 21:09:42 UTC (rev 1011)
+++ store/trunk/cpp/lib/JournalImpl.cpp 2007-10-11 21:46:52 UTC (rev 1012)
@@ -37,12 +37,29 @@
void
JournalImpl::recover(std::deque<journal::data_tok*>* rd_dtokl, const journal::aio_cb rd_cb,
std::deque<journal::data_tok*>* wr_dtokl, const journal::aio_cb wr_cb,
- boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list) throw (journal::jexception)
+ boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list, u_int64_t queue_id)
+ throw (journal::jexception)
{
// Create list of prepared xids
std::vector<std::string> prep_xid_list;
for (bdbstore::PreparedTransaction::list::iterator i = prep_tx_list.begin();
- i != prep_tx_list.end(); i++)
+ i != prep_tx_list.end(); i++) {
prep_xid_list.push_back(i->xid);
+ }
+
journal::jcntl::recover(rd_dtokl, rd_cb, wr_dtokl, wr_cb, prep_xid_list);
+
+ // Populate PreparedTransaction lists from _tmap
+ for (bdbstore::PreparedTransaction::list::iterator i = prep_tx_list.begin();
+ i != prep_tx_list.end(); i++) {
+ journal::txn_data_list tdl = _tmap.get_tdata_list(i->xid);
+ assert(tdl.size()); // should never be empty
+ for (journal::tdl_itr tdl_itr = tdl.begin(); tdl_itr < tdl.end(); tdl_itr++) {
+ if (tdl_itr->_enq_flag) { // enqueue op
+ i->enqueues->add(queue_id, tdl_itr->_rid);
+ } else { // dequeue op
+ i->dequeues->add(queue_id, tdl_itr->_rid);
+ }
+ }
+ }
}
Modified: store/trunk/cpp/lib/JournalImpl.h
===================================================================
--- store/trunk/cpp/lib/JournalImpl.h 2007-10-11 21:09:42 UTC (rev 1011)
+++ store/trunk/cpp/lib/JournalImpl.h 2007-10-11 21:46:52 UTC (rev 1012)
@@ -41,14 +41,14 @@
~JournalImpl();
void recover(std::deque<journal::data_tok*>* rd_dtokl, const journal::aio_cb rd_cb,
std::deque<journal::data_tok*>* wr_dtokl, const journal::aio_cb wr_cb,
- boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list)
- throw (journal::jexception);
+ boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list,
+ u_int64_t queue_id) throw (journal::jexception);
- void recover(boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list)
- throw (journal::jexception)
+ void recover(boost::ptr_list<bdbstore::PreparedTransaction>& prep_tx_list,
+ u_int64_t queue_id) throw (journal::jexception)
{
recover(&_aio_rd_cmpl_dtok_list, &aio_rd_callback, &_aio_wr_cmpl_dtok_list,
- &aio_wr_callback, prep_tx_list);
+ &aio_wr_callback, prep_tx_list, queue_id);
}
};
Modified: store/trunk/cpp/lib/jrnl/deq_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/deq_rec.cpp 2007-10-11 21:09:42 UTC (rev 1011)
+++ store/trunk/cpp/lib/jrnl/deq_rec.cpp 2007-10-11 21:46:52 UTC (rev 1012)
@@ -319,11 +319,18 @@
}
const bool
-deq_rec::rcv_decode(hdr h, std::ifstream* ifsp, u_int32_t& rec_offs_dblks) throw (jexception)
+deq_rec::rcv_decode(hdr h, std::ifstream* ifsp, size_t& rec_offs) throw (jexception)
{
- if (rec_offs_dblks) // Contunue decoding xid from previous decode call
+ if (rec_offs) // Contunue decoding xid from previous decode call
{
- // TODO
+ ifsp->read((char*)_buff + rec_offs, _deq_hdr._xidsize - rec_offs);
+ size_t size_read = ifsp->gcount();
+ if (size_read < _deq_hdr._xidsize - rec_offs)
+ {
+ assert(ifsp->eof());
+ rec_offs += size_read;
+ return false;
+ }
}
else // Start at beginning of record
{
@@ -347,24 +354,17 @@
}
// Decode xid
ifsp->read((char*)_buff, _deq_hdr._xidsize);
- if ((size_t)ifsp->gcount() == _deq_hdr._xidsize)
+ size_t size_read = ifsp->gcount();
+ if (size_read < _deq_hdr._xidsize)
{
- ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - sizeof(_deq_hdr) -
- _deq_hdr._xidsize);
- return true;
+ assert(ifsp->eof());
+ rec_offs = size_read;
+ return false;
}
- else
- ; // TODO
}
- else
- {
- // Igore rest of record
- rec_offs_dblks = rec_size_dblks();
- ifsp->ignore(rec_offs_dblks * JRNL_DBLK_SIZE - sizeof(_deq_hdr));
- return true;
- }
}
- return false;
+ ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - sizeof(_deq_hdr) - _deq_hdr._xidsize);
+ return true;
}
const size_t
Modified: store/trunk/cpp/lib/jrnl/deq_rec.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/deq_rec.hpp 2007-10-11 21:09:42 UTC (rev 1011)
+++ store/trunk/cpp/lib/jrnl/deq_rec.hpp 2007-10-11 21:46:52 UTC (rev 1012)
@@ -78,8 +78,8 @@
const u_int32_t decode(hdr& h, void* rptr, u_int32_t rec_offs_dblks,
u_int32_t max_size_dblks) throw (jexception);
// Decode used for recover
- const bool rcv_decode(hdr h, std::ifstream* ifsp, u_int32_t& rec_offs_dblks)
- throw (jexception);
+ const bool rcv_decode(hdr h, std::ifstream* ifsp, size_t& rec_offs) throw (jexception);
+
inline const u_int64_t deq_rid() const { return _deq_hdr._deq_rid; }
const size_t get_xid(void** const xidpp);
std::string& str(std::string& str) const;
Modified: store/trunk/cpp/lib/jrnl/enq_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_rec.cpp 2007-10-11 21:09:42 UTC (rev 1011)
+++ store/trunk/cpp/lib/jrnl/enq_rec.cpp 2007-10-11 21:46:52 UTC (rev 1012)
@@ -428,11 +428,18 @@
}
const bool
-enq_rec::rcv_decode(hdr h, std::ifstream* ifsp, u_int32_t& rec_offs_dblks) throw (jexception)
+enq_rec::rcv_decode(hdr h, std::ifstream* ifsp, size_t& rec_offs) throw (jexception)
{
- if (rec_offs_dblks) // Contunue decoding xid from previous decode call
+ if (rec_offs) // Contunue decoding xid from previous decode call
{
- // TODO
+ ifsp->read((char*)_buff + rec_offs, _enq_hdr._xidsize - rec_offs);
+ size_t size_read = ifsp->gcount();
+ if (size_read < _enq_hdr._xidsize - rec_offs)
+ {
+ assert(ifsp->eof());
+ rec_offs += size_read;
+ return false;
+ }
}
else // Start at beginning of record
{
@@ -462,23 +469,17 @@
}
// Decode xid
ifsp->read((char*)_buff, _enq_hdr._xidsize);
- if ((size_t)ifsp->gcount() == _enq_hdr._xidsize)
+ size_t size_read = ifsp->gcount();
+ if (size_read < _enq_hdr._xidsize)
{
- ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - sizeof(_enq_hdr) -
- _enq_hdr._xidsize);
- return true;
+ assert(ifsp->eof());
+ rec_offs = size_read;
+ return false;
}
- else
- ; // TODO
}
- else
- {
- // Igore rest of record
- ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - sizeof(_enq_hdr));
- return true;
- }
}
- return false;
+ ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - sizeof(_enq_hdr) - _enq_hdr._xidsize);
+ return true;
}
const size_t
Modified: store/trunk/cpp/lib/jrnl/enq_rec.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/enq_rec.hpp 2007-10-11 21:09:42 UTC (rev 1011)
+++ store/trunk/cpp/lib/jrnl/enq_rec.hpp 2007-10-11 21:46:52 UTC (rev 1012)
@@ -89,8 +89,8 @@
const u_int32_t decode(hdr& h, void* rptr, u_int32_t rec_offs_dblks,
u_int32_t max_size_dblks) throw (jexception);
// Decode used for recover
- const bool rcv_decode(hdr h, std::ifstream* ifsp, u_int32_t& rec_offs_dblks)
- throw (jexception);
+ const bool rcv_decode(hdr h, std::ifstream* ifsp, size_t& rec_offs) throw (jexception);
+
const size_t get_xid(void** const xidpp);
const size_t get_data(void** const datapp);
inline const bool is_transient() const { return _enq_hdr.is_transient(); }
Modified: store/trunk/cpp/lib/jrnl/jcntl.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-10-11 21:09:42 UTC (rev 1011)
+++ store/trunk/cpp/lib/jrnl/jcntl.cpp 2007-10-11 21:46:52 UTC (rev 1012)
@@ -255,9 +255,9 @@
}
const bool
-jcntl::is_txn_synced(const std::string& /*xid*/) throw (jexception)
+jcntl::is_txn_synced(const std::string& xid) throw (jexception)
{
- return RHM_IORES_NOTIMPL;
+ return _tmap.is_txn_synced(xid);
}
const u_int32_t
@@ -375,7 +375,7 @@
jcntl::rcvr_get_next_record(u_int16_t& fid, std::ifstream* ifsp, rcvdat& rd,
const std::vector<std::string>& prep_txn_list) throw (jexception)
{
- u_int32_t dblks_read = 0;
+ size_t cum_size_read = 0;
bool done = false;
void* xidp = NULL;
hdr h;
@@ -389,7 +389,7 @@
enq_rec er;
while (!done)
{
- done = er.rcv_decode(h, ifsp, dblks_read);
+ done = er.rcv_decode(h, ifsp, cum_size_read);
jfile_cycle(fid, ifsp, rd, false);
}
rd._enq_cnt_list[fid]++;
@@ -417,7 +417,7 @@
deq_rec dr;
while (!done)
{
- done = dr.rcv_decode(h, ifsp, dblks_read);
+ done = dr.rcv_decode(h, ifsp, cum_size_read);
jfile_cycle(fid, ifsp, rd, false);
}
if (dr.xid_size())
@@ -458,7 +458,7 @@
txn_rec ar;
while (!done)
{
- done = ar.rcv_decode(h, ifsp, dblks_read);
+ done = ar.rcv_decode(h, ifsp, cum_size_read);
jfile_cycle(fid, ifsp, rd, false);
}
// Delete this txn from tmap, unlock any locked records in emap
@@ -493,7 +493,7 @@
txn_rec cr;
while (!done)
{
- done = cr.rcv_decode(h, ifsp, dblks_read);
+ done = cr.rcv_decode(h, ifsp, cum_size_read);
jfile_cycle(fid, ifsp, rd, false);
}
// Delete this txn from tmap, process records into emap
Modified: store/trunk/cpp/lib/jrnl/jerrno.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jerrno.cpp 2007-10-11 21:09:42 UTC (rev 1011)
+++ store/trunk/cpp/lib/jrnl/jerrno.cpp 2007-10-11 21:46:52 UTC (rev 1012)
@@ -204,8 +204,8 @@
// class enq_map, txn_map
_err_map[JERR_MAP_DUPLICATE] = std::string("JERR_MAP_DUPLICATE: "
- "Attempted to insert enqueue record using duplicate key.");
- _err_map[JERR_MAP_NOTFOUND] = std::string("JERR_MAP_NOTFOUND: Key not found in enqueue map.");
+ "Attempted to insert record into map using duplicate key.");
+ _err_map[JERR_MAP_NOTFOUND] = std::string("JERR_MAP_NOTFOUND: Key not found in map.");
_err_map[JERR_MAP_LOCKED] = std::string("JERR_MAP_LOCKED: "
"Record ID locked by a pending transaction.");
Modified: store/trunk/cpp/lib/jrnl/jrec.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/jrec.hpp 2007-10-11 21:09:42 UTC (rev 1011)
+++ store/trunk/cpp/lib/jrnl/jrec.hpp 2007-10-11 21:46:52 UTC (rev 1012)
@@ -150,7 +150,7 @@
virtual const u_int32_t decode(hdr& h, void* rptr, u_int32_t rec_offs_dblks,
u_int32_t max_size_dblks) throw (jexception) = 0;
- virtual const bool rcv_decode(hdr h, std::ifstream* ifsp, u_int32_t& rec_offs_dblks)
+ virtual const bool rcv_decode(hdr h, std::ifstream* ifsp, size_t& rec_offs)
throw (jexception) = 0;
virtual std::string& str(std::string& str) const = 0;
Modified: store/trunk/cpp/lib/jrnl/txn_map.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.cpp 2007-10-11 21:09:42 UTC (rev 1011)
+++ store/trunk/cpp/lib/jrnl/txn_map.cpp 2007-10-11 21:46:52 UTC (rev 1012)
@@ -44,7 +44,8 @@
txn_data_struct::txn_data_struct(const u_int64_t rid, const u_int16_t fid, const bool enq_flag):
_rid(rid),
_fid(fid),
- _enq_flag(enq_flag)
+ _enq_flag(enq_flag),
+ _aio_compl(false)
{}
txn_map::txn_map():
@@ -58,21 +59,24 @@
pthread_mutex_destroy(&_mutex);
}
-void
+const bool
txn_map::insert_txn_data(const std::string& xid, const txn_data& td) throw (jexception)
{
+ bool ok = true;
pthread_mutex_lock(&_mutex);
xmap_itr itr = _map.find(xid);
- pthread_mutex_unlock(&_mutex);
if (itr == _map.end()) // not found in map
{
txn_data_list list;
list.push_back(td);
std::pair<xmap_itr, bool> ret = _map.insert(xmap_param(xid, list));
- // TODO: check for failure here?
+ if (!ret.second) // duplicate
+ ok = false;
}
else
itr->second.push_back(td);
+ pthread_mutex_unlock(&_mutex);
+ return ok;
}
const txn_data_list
@@ -85,7 +89,7 @@
{
std::stringstream ss;
ss << std::hex << "xid=\"" << xid << "\"";
- throw jexception(jerrno::JERR_MAP_NOTFOUND, ss.str(), "txn_map", "get_fid");
+ throw jexception(jerrno::JERR_MAP_NOTFOUND, ss.str(), "txn_map", "txn_data_list");
}
return itr->second;
}
@@ -100,7 +104,7 @@
pthread_mutex_unlock(&_mutex);
std::stringstream ss;
ss << std::hex << "xid=\"" << xid << "\"";
- throw jexception(jerrno::JERR_MAP_NOTFOUND, ss.str(), "txn_map", "get_remove_fid");
+ throw jexception(jerrno::JERR_MAP_NOTFOUND, ss.str(), "txn_map", "get_remove_tdata_list");
}
txn_data_list list = itr->second;
_map.erase(itr);
@@ -118,11 +122,69 @@
{
std::stringstream ss;
ss << std::hex << "xid=\"" << xid << "\"";
- throw jexception(jerrno::JERR_MAP_NOTFOUND, ss.str(), "txn_map", "get_fid");
+ throw jexception(jerrno::JERR_MAP_NOTFOUND, ss.str(), "txn_map", "get_rid_count");
}
return itr->second.size();
}
+const bool
+txn_map::is_txn_synced(const std::string& xid) throw (jexception)
+{
+ pthread_mutex_lock(&_mutex);
+ xmap_itr itr = _map.find(xid);
+ if (itr == _map.end()) // not found in map
+ {
+ pthread_mutex_unlock(&_mutex);
+ std::stringstream ss;
+ ss << std::hex << "xid=\"" << xid << "\"";
+ throw jexception(jerrno::JERR_MAP_NOTFOUND, ss.str(), "txn_map", "is_txn_synced");
+ }
+ txn_data_list list = itr->second;
+ bool is_synced = true;
+ for (tdl_itr litr = list.begin(); litr < list.end(); litr++)
+ {
+ if (!litr->_aio_compl)
+ {
+ is_synced = false;
+ break;
+ }
+ }
+ pthread_mutex_unlock(&_mutex);
+ return is_synced;
+}
+
+const bool
+txn_map::set_aio_compl(const std::string& xid, const u_int64_t rid)
+{
+ bool ok = true;
+ bool found = false;
+ pthread_mutex_lock(&_mutex);
+ xmap_itr itr = _map.find(xid);
+ if (itr == _map.end()) // not found in map
+ ok = false;
+ else
+ {
+ txn_data_list list = itr->second;
+ for (tdl_itr litr = list.begin(); litr < list.end(); litr++)
+ {
+ if (litr->_rid == rid)
+ {
+ found = true;
+ litr->_aio_compl = true;
+ break;
+ }
+ }
+ }
+ pthread_mutex_unlock(&_mutex);
+ if (ok && !found)
+ {
+ std::stringstream ss;
+ ss << std::hex << "xid=\"" << xid << "\" rid=" << rid;
+ throw jexception(jerrno::JERR_MAP_NOTFOUND, ss.str(), "txn_map", "set_aio_compl");
+ }
+ return ok;
+}
+
void
txn_map::xid_list(std::vector<std::string>& xv)
{
Modified: store/trunk/cpp/lib/jrnl/txn_map.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_map.hpp 2007-10-11 21:09:42 UTC (rev 1011)
+++ store/trunk/cpp/lib/jrnl/txn_map.hpp 2007-10-11 21:46:52 UTC (rev 1012)
@@ -53,9 +53,10 @@
struct txn_data_struct
{
- u_int64_t _rid;
- u_int16_t _fid;
- bool _enq_flag;
+ u_int64_t _rid; ///< Record id for this operation
+ u_int16_t _fid; ///< File id, to be used when transferring to emap on commit
+ bool _enq_flag; ///< If true, enq op, otherwise deq op
+ bool _aio_compl; ///< Initially false, set to true when AIO returns
txn_data_struct(const u_int64_t rid, const u_int16_t fid, const bool enq_flag);
};
typedef txn_data_struct txn_data;
@@ -76,10 +77,12 @@
txn_map();
~txn_map();
- void insert_txn_data(const std::string& xid, const txn_data& td) throw (jexception);
+ const bool insert_txn_data(const std::string& xid, const txn_data& td) throw (jexception);
const txn_data_list get_tdata_list(const std::string& xid) throw (jexception);
const txn_data_list get_remove_tdata_list(const std::string& xid) throw (jexception);
const u_int32_t get_rid_count(const std::string& xid) throw (jexception);
+ const bool is_txn_synced(const std::string& xid) throw (jexception);
+ const bool set_aio_compl(const std::string& xid, const u_int64_t rid);
inline void clear() { _map.clear(); }
inline const bool empty() const { return _map.empty(); }
inline const u_int16_t size() const { return (u_int16_t)_map.size(); }
Modified: store/trunk/cpp/lib/jrnl/txn_rec.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_rec.cpp 2007-10-11 21:09:42 UTC (rev 1011)
+++ store/trunk/cpp/lib/jrnl/txn_rec.cpp 2007-10-11 21:46:52 UTC (rev 1012)
@@ -314,11 +314,18 @@
}
const bool
-txn_rec::rcv_decode(hdr h, std::ifstream* ifsp, u_int32_t& rec_offs_dblks) throw (jexception)
+txn_rec::rcv_decode(hdr h, std::ifstream* ifsp, size_t& rec_offs) throw (jexception)
{
- if (rec_offs_dblks) // Contunue decoding xid from previous decode call
+ if (rec_offs) // Contunue decoding xid from previous decode call
{
- // TODO
+ ifsp->read((char*)_buff + rec_offs, _txn_hdr._xidsize - rec_offs);
+ size_t size_read = ifsp->gcount();
+ if (size_read < _txn_hdr._xidsize - rec_offs)
+ {
+ assert(ifsp->eof());
+ rec_offs += size_read;
+ return false;
+ }
}
else // Start at beginning of record
{
@@ -339,15 +346,16 @@
}
// Decode xid
ifsp->read((char*)_buff, _txn_hdr._xidsize);
- if ((size_t)ifsp->gcount() == _txn_hdr._xidsize)
+ size_t size_read = ifsp->gcount();
+ if (size_read < _txn_hdr._xidsize)
{
- ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - sizeof(_txn_hdr) - _txn_hdr._xidsize);
- return true;
+ assert(ifsp->eof());
+ rec_offs = size_read;
+ return false;
}
- else
- ; // TODO
}
- return false;
+ ifsp->ignore(rec_size_dblks() * JRNL_DBLK_SIZE - sizeof(_txn_hdr) - _txn_hdr._xidsize);
+ return true;
}
const size_t
Modified: store/trunk/cpp/lib/jrnl/txn_rec.hpp
===================================================================
--- store/trunk/cpp/lib/jrnl/txn_rec.hpp 2007-10-11 21:09:42 UTC (rev 1011)
+++ store/trunk/cpp/lib/jrnl/txn_rec.hpp 2007-10-11 21:46:52 UTC (rev 1012)
@@ -78,8 +78,8 @@
const u_int32_t decode(hdr& h, void* rptr, u_int32_t rec_offs_dblks,
u_int32_t max_size_dblks) throw (jexception);
// Decode used for recover
- const bool rcv_decode(hdr h, std::ifstream* ifsp, u_int32_t& rec_offs_dblks)
- throw (jexception);
+ const bool rcv_decode(hdr h, std::ifstream* ifsp, size_t& rec_offs) throw (jexception);
+
const size_t get_xid(void** const xidpp);
std::string& str(std::string& str) const;
inline const size_t data_size() const { return 0; } // This record never carries data
Modified: store/trunk/cpp/lib/jrnl/wmgr.cpp
===================================================================
--- store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-10-11 21:09:42 UTC (rev 1011)
+++ store/trunk/cpp/lib/jrnl/wmgr.cpp 2007-10-11 21:46:52 UTC (rev 1012)
@@ -123,7 +123,6 @@
_enq_busy = true;
u_int64_t rid = initialize_rid(cont, dtokp);
-
_enq_rec.reset(rid, data_buff, tot_data_len, xid_ptr, xid_len, transient);
if (!cont)
{
@@ -262,8 +261,14 @@
}
u_int64_t rid = initialize_rid(cont, dtokp);
-
_deq_rec.reset(rid, dtokp->rid(), xid_ptr, xid_len);
+ if (!cont)
+ {
+ if (xid_len)
+ dtokp->set_xid(xid_ptr, xid_len);
+ else
+ dtokp->clear_xid();
+ }
bool done = false;
while (!done)
{
@@ -392,6 +397,14 @@
u_int64_t rid = initialize_rid(cont, dtokp);
_txn_rec.reset(RHM_JDAT_TXA_MAGIC, rid, xid_ptr, xid_len);
+ if (!cont)
+ {
+ dtokp->set_rid(rid);
+ if (xid_len)
+ dtokp->set_xid(xid_ptr, xid_len);
+ else
+ dtokp->clear_xid();
+ }
bool done = false;
while (!done)
{
@@ -516,6 +529,14 @@
u_int64_t rid = initialize_rid(cont, dtokp);
_txn_rec.reset(RHM_JDAT_TXC_MAGIC, rid, xid_ptr, xid_len);
+ if (!cont)
+ {
+ dtokp->set_rid(rid);
+ if (xid_len)
+ dtokp->set_xid(xid_ptr, xid_len);
+ else
+ dtokp->clear_xid();
+ }
bool done = false;
while (!done)
{
@@ -750,6 +771,8 @@
throw jexception(jerrno::JERR_WMGR_BADDTOKSTATE, ss.str(), "wmgr",
"get_events");
}
+ if (dtp->has_xid())
+ _tmap.set_aio_compl(dtp->xid(), dtp->rid());
_dtokl->push_back(dtp);
}
tot_data_toks += s;
@@ -766,6 +789,7 @@
}
else // File header writes have no pcb
{
+ // get fid from original file header record, update pointers for that fid
file_hdr* fhp = (file_hdr*)iocbp->u.c.buf;
u_int32_t fid = fhp->_fid;
nlfh* nlfhp = _wrfc.file_handle(fid);
18 years, 6 months
rhmessaging commits: r1011 - store/trunk/cpp/lib.
by rhmessaging-commits@lists.jboss.org
Author: cctrieloff
Date: 2007-10-11 17:09:42 -0400 (Thu, 11 Oct 2007)
New Revision: 1011
Modified:
store/trunk/cpp/lib/BdbMessageStore.cpp
Log:
use dtx methods
Modified: store/trunk/cpp/lib/BdbMessageStore.cpp
===================================================================
--- store/trunk/cpp/lib/BdbMessageStore.cpp 2007-10-11 20:20:39 UTC (rev 1010)
+++ store/trunk/cpp/lib/BdbMessageStore.cpp 2007-10-11 21:09:42 UTC (rev 1011)
@@ -390,8 +390,6 @@
JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
DataTokenImpl dtokp;
size_t readSize = 0;
-// char** buff = 0;
-// unsigned aio_sleep_cnt = 0;
unsigned msg_count=0;
bool read = true;
@@ -405,12 +403,8 @@
try {
while (read) {
-//std:: cout << "loop -- uses fixed size -> FIX <-" << std::endl;
-
-// const iores read_data(void** dbuf, size_t& dbsize, data_tok* const dtok)
rhm::journal::iores res = jc->read_data_record(&dbuff, dbuffSize, &xidbuff, xidbuffSize, transientFlag, &dtokp);
readSize = dtokp.dsize();
-// assert(readSize < buffSize); /// fail safe for hack...
switch (res)
{
@@ -430,9 +424,7 @@
Buffer contentBuff(data + contentOffset, contentSize);
msg->decodeContent(contentBuff);
}
- // TODO - change to prep list based on reading state from journal
- // -- add to prepared.enqued list..
- if (PreparedTransaction::isLocked(locked, queue->getPersistenceId(), dtokp.rid())) {
+ if (xidbuffSize > 0 && PreparedTransaction::isLocked(locked, queue->getPersistenceId(), dtokp.rid()) ) {
prepared[dtokp.rid()] = msg;
} else {
queue->recover(msg);
@@ -798,7 +790,12 @@
while (!written)
{
JournalImpl* jc = static_cast<JournalImpl*>(queue->getExternalQueueStore());
- rhm::journal::iores eres = jc->enqueue_data_record(buff, size, size, dtokp, /*txn->getXid(),*/ false);
+ rhm::journal::iores eres;
+ if (txn->getXid().empty()){
+ eres = jc->enqueue_data_record(buff, size, size, dtokp, false);
+ }else {
+ eres = jc->enqueue_txn_data_record(buff, size, size, dtokp, txn->getXid(), false);
+ }
switch (eres)
{
case rhm::journal::RHM_IORES_SUCCESS:
@@ -910,7 +907,11 @@
{
rhm::journal::iores dres;
try {
- dres = jc->dequeue_txn_data_record(ddtokp, tid);
+ if (tid.empty()){
+ dres = jc->dequeue_data_record(ddtokp);
+ } else {
+ dres = jc->dequeue_txn_data_record(ddtokp, tid);
+ }
} catch (rhm::journal::jexception& e) {
std::string str;
delete ddtokp;
18 years, 6 months
rhmessaging commits: r1010 - in mgmt/cumin/python: wooly and 1 other directory.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2007-10-11 16:20:39 -0400 (Thu, 11 Oct 2007)
New Revision: 1010
Modified:
mgmt/cumin/python/cumin/cluster.py
mgmt/cumin/python/cumin/cluster.strings
mgmt/cumin/python/cumin/page.py
mgmt/cumin/python/cumin/page.strings
mgmt/cumin/python/cumin/server.py
mgmt/cumin/python/cumin/server.strings
mgmt/cumin/python/wooly/forms.py
Log:
* Removes the top-level vhost tab for now.
* Adds config and stats to server.
Modified: mgmt/cumin/python/cumin/cluster.py
===================================================================
--- mgmt/cumin/python/cumin/cluster.py 2007-10-11 17:11:06 UTC (rev 1009)
+++ mgmt/cumin/python/cumin/cluster.py 2007-10-11 20:20:39 UTC (rev 1010)
@@ -88,8 +88,9 @@
self.tabs.add_child(self.ServerTab(app, "servers"))
self.tabs.add_child(self.VirtualHostTab(app, "vhosts"))
- self.tabs.add_child(self.ConfigurationTab(app, "config"))
- self.tabs.add_child(self.StatisticsTab(app, "stats"))
+ self.tabs.add_child(self.ConfigTab(app, "config"))
+ self.tabs.add_child(self.StatsTab(app, "stats"))
+ self.tabs.add_child(self.LogTab(app, "log"))
def render_title(self, session, cluster):
return "Cluster '%s'" % cluster.name
@@ -107,10 +108,15 @@
def get_items(self, session, cluster):
return sorted(cluster.virtual_host_items(), cmp, lambda x: x.name)
- class ConfigurationTab(Widget):
+ class ConfigTab(Widget):
def render_title(self, session, cluster):
return "Configuration"
- class StatisticsTab(Widget):
+ class StatsTab(Widget):
def render_title(self, session, cluster):
return "Statistics"
+
+ class LogTab(Widget):
+ def render_title(self, session, cluster):
+ return "Log Messages"
+
Modified: mgmt/cumin/python/cumin/cluster.strings
===================================================================
--- mgmt/cumin/python/cumin/cluster.strings 2007-10-11 17:11:06 UTC (rev 1009)
+++ mgmt/cumin/python/cumin/cluster.strings 2007-10-11 20:20:39 UTC (rev 1010)
@@ -81,14 +81,7 @@
<td><a class="action" href="">Remove</a></td>
</tr>
-[ConfigurationTab.css]
-.code {
- background-color: #f7f7f7;
- padding: 1em;
- border: 1px dotted #ddd;
-}
-
-[ConfigurationTab.html]
+[ConfigTab.html]
<ul class="actions">
<li><a href="">Edit This Configuration</a></li>
<li><a href="">Apply to All Cluster Servers</a></li>
@@ -112,4 +105,4 @@
vhost "test" {
}
}
-</pre>
\ No newline at end of file
+</pre>
Modified: mgmt/cumin/python/cumin/page.py
===================================================================
--- mgmt/cumin/python/cumin/page.py 2007-10-11 17:11:06 UTC (rev 1009)
+++ mgmt/cumin/python/cumin/page.py 2007-10-11 20:20:39 UTC (rev 1010)
@@ -160,7 +160,6 @@
self.tabs.add_child(self.ServerTab(app, "servers"))
self.tabs.add_child(self.ClusterTab(app, "clusters"))
- self.tabs.add_child(self.VirtualHostTab(app, "vhosts"))
def render_title(self, session, model):
return "Red Hat Messaging"
@@ -188,27 +187,6 @@
def render_title(self, session, model):
return self.clusters.render_title(session, model)
- class VirtualHostTab(TabSet):
- def __init__(self, app, name):
- super(MainView.VirtualHostTab, self).__init__(app, name)
-
- self.vhosts = self.VirtualHostTemplateSet(app, "vhosts")
- self.add_child(self.vhosts)
-
- self.groups = VirtualHostGroupTree(app, "groups")
- self.add_child(self.groups)
-
- def render_title(self, session, model):
- return "Functional Hosts"
-
- class VirtualHostTemplateSet(VirtualHostSet):
- def get_items(self, session, model):
- return model.get_virtual_host_templates()
-
- def render_title(self, session, model):
- return "Templates (%i)" % \
- len(model.get_virtual_host_templates())
-
class ServerBrowser(Widget):
def __init__(self, app, name):
super(ServerBrowser, self).__init__(app, name)
Modified: mgmt/cumin/python/cumin/page.strings
===================================================================
--- mgmt/cumin/python/cumin/page.strings 2007-10-11 17:11:06 UTC (rev 1009)
+++ mgmt/cumin/python/cumin/page.strings 2007-10-11 20:20:39 UTC (rev 1010)
@@ -335,6 +335,12 @@
background-color: #cfc;
}
+pre.code {
+ background-color: #f7f7f7;
+ padding: 1em;
+ border: 1px dotted #ddd;
+}
+
[CuminPage.html]
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.1//EN" "http://www.w3.org/TR/xhtml11/DTD/xhtml11.dtd">
@@ -379,32 +385,6 @@
{clusters}
-[VirtualHostTab.html]
-<ul class="radiotabs tabs">{tabs}</ul>
-<div class="mode">{mode}</div>
-
-[VirtualHostTemplateSet.html]
-<ul class="actions">
- <li><a href="">Add Template</a></li>
-</ul>
-
-<table class="mobjects">
- <tr>
- <th>Template</th>
- <th>Configuration</th>
- <th></th>
- </tr>
-
- {items}
-</table>
-
-[VirtualHostTemplateSet.item_html]
-<tr>
- <td>{item_link}</td>
- <td>10 queues, 5 exchanges</td>
- <td><a class="action" href="">Remove</a></td>
-</tr>
-
[ServerBrowser.css]
.ServerBrowser.groups {
float: left;
Modified: mgmt/cumin/python/cumin/server.py
===================================================================
--- mgmt/cumin/python/cumin/server.py 2007-10-11 17:11:06 UTC (rev 1009)
+++ mgmt/cumin/python/cumin/server.py 2007-10-11 20:20:39 UTC (rev 1010)
@@ -97,12 +97,11 @@
self.tabs = TabSet(app, "tabs")
self.add_child(self.tabs)
- self.vhosts = self.ServerVirtualHosts(app, "vhosts")
- self.tabs.add_child(self.vhosts)
+ self.tabs.add_child(self.VirtualHostTab(app, "vhosts"))
+ self.tabs.add_child(self.ConfigTab(app, "config"))
+ self.tabs.add_child(self.StatsTab(app, "stats"))
+ self.tabs.add_child(self.LogTab(app, "log"))
- self.log = self.ServerLog(app, "log")
- self.tabs.add_child(self.log)
-
def render_title(self, session, server):
return "Server '%s'" % server.name
@@ -125,15 +124,23 @@
def get_items(self, session, server):
return sorted(server.server_group_items(), cmp, lambda x: x.name)
- class ServerVirtualHosts(VirtualHostSet):
+ class VirtualHostTab(VirtualHostSet):
def render_title(self, session, server):
return "Functional Hosts (%i)" % len(server.virtual_host_items())
def get_items(self, session, server):
return sorted(server.virtual_host_items(), cmp, lambda x: x.name)
- class ServerLog(Widget):
+ class ConfigTab(Widget):
def render_title(self, session, server):
+ return "Configuration"
+
+ class StatsTab(Widget):
+ def render_title(self, session, server):
+ return "Statistics"
+
+ class LogTab(Widget):
+ def render_title(self, session, server):
return "Log Messages"
class ServerGroupTree(ItemTree):
Modified: mgmt/cumin/python/cumin/server.strings
===================================================================
--- mgmt/cumin/python/cumin/server.strings 2007-10-11 17:11:06 UTC (rev 1009)
+++ mgmt/cumin/python/cumin/server.strings 2007-10-11 20:20:39 UTC (rev 1010)
@@ -38,6 +38,31 @@
{tabs}
</div>
+[ConfigTab.html]
+<ul class="actions">
+ <li><a href="">Edit This Configuration</a></li>
+</ul>
+
+<pre class="code">
+server "server00" {
+ include "/usr/local/cluster/common.conf";
+
+ vhost "default" {
+ store: bdb;
+ transaction: journal;
+ }
+
+ vhost "devel" {
+ max-queue-depth: 10000;
+ store: memory;
+ transaction: memory;
+ }
+
+ vhost "test" {
+ }
+}
+</pre>
+
[ServerGroupTree.css]
ul.ServerGroupTree, ul.ServerGroupTree ul {
list-style: square;
Modified: mgmt/cumin/python/wooly/forms.py
===================================================================
--- mgmt/cumin/python/wooly/forms.py 2007-10-11 17:11:06 UTC (rev 1009)
+++ mgmt/cumin/python/wooly/forms.py 2007-10-11 20:20:39 UTC (rev 1010)
@@ -33,10 +33,6 @@
return writer.to_string()
-# XXX implement me
-#class FormInputItemSet(FormInput, ItemSet):
-# pass
-
class FormInput(Widget):
def __init__(self, app, name, form):
super(FormInput, self).__init__(app, name)
18 years, 6 months
rhmessaging commits: r1009 - mgmt/cumin/python/cumin.
by rhmessaging-commits@lists.jboss.org
Author: justi9
Date: 2007-10-11 13:11:06 -0400 (Thu, 11 Oct 2007)
New Revision: 1009
Modified:
mgmt/cumin/python/cumin/cluster.py
mgmt/cumin/python/cumin/cluster.strings
Log:
Adds a simulated config tab to cluster.
Modified: mgmt/cumin/python/cumin/cluster.py
===================================================================
--- mgmt/cumin/python/cumin/cluster.py 2007-10-11 16:42:35 UTC (rev 1008)
+++ mgmt/cumin/python/cumin/cluster.py 2007-10-11 17:11:06 UTC (rev 1009)
@@ -88,6 +88,8 @@
self.tabs.add_child(self.ServerTab(app, "servers"))
self.tabs.add_child(self.VirtualHostTab(app, "vhosts"))
+ self.tabs.add_child(self.ConfigurationTab(app, "config"))
+ self.tabs.add_child(self.StatisticsTab(app, "stats"))
def render_title(self, session, cluster):
return "Cluster '%s'" % cluster.name
@@ -104,3 +106,11 @@
def get_items(self, session, cluster):
return sorted(cluster.virtual_host_items(), cmp, lambda x: x.name)
+
+ class ConfigurationTab(Widget):
+ def render_title(self, session, cluster):
+ return "Configuration"
+
+ class StatisticsTab(Widget):
+ def render_title(self, session, cluster):
+ return "Statistics"
Modified: mgmt/cumin/python/cumin/cluster.strings
===================================================================
--- mgmt/cumin/python/cumin/cluster.strings 2007-10-11 16:42:35 UTC (rev 1008)
+++ mgmt/cumin/python/cumin/cluster.strings 2007-10-11 17:11:06 UTC (rev 1009)
@@ -60,9 +60,9 @@
</tr>
[VirtualHostTab.html]
- <ul class="actions">
- <li><a href="">Add Functional Host</a></li>
- </ul>
+<ul class="actions">
+ <li><a href="">Add Functional Host</a></li>
+</ul>
<table class="mobjects">
<tr>
@@ -80,3 +80,36 @@
<td>10 queues, 5 exchanges</td>
<td><a class="action" href="">Remove</a></td>
</tr>
+
+[ConfigurationTab.css]
+.code {
+ background-color: #f7f7f7;
+ padding: 1em;
+ border: 1px dotted #ddd;
+}
+
+[ConfigurationTab.html]
+<ul class="actions">
+ <li><a href="">Edit This Configuration</a></li>
+ <li><a href="">Apply to All Cluster Servers</a></li>
+</ul>
+
+<pre class="code">
+server "$SERVER" {
+ include "/usr/local/cluster/common.conf";
+
+ vhost "default" {
+ store: bdb;
+ transaction: journal;
+ }
+
+ vhost "devel" {
+ max-queue-depth: 10000;
+ store: memory;
+ transaction: memory;
+ }
+
+ vhost "test" {
+ }
+}
+</pre>
\ No newline at end of file
18 years, 6 months