[
https://issues.jboss.org/browse/JGRP-2150?page=com.atlassian.jira.plugin....
]
Bela Ban edited comment on JGRP-2150 at 1/5/17 11:36 AM:
---------------------------------------------------------
Better algorithm for a bounded MessageBatch:
{code:java}
protected final MessageBatch batch=new MessageBatch(BATCH_SIZE);
protected final AtomicInteger counter=new AtomicInteger(0);
protected void add(MessageBatch mb) {
int size=add(mb); // adds mb to this.batch
if(size > 0)
drain(size);
}
protected void drain(int num) {
if(counter.getAndAdd(num) == 0) {
final MessageBatch delivery_batch=new MessageBatch(num);
do {
delivery_batch.reset();
removed_msgs=_transfer(delivery_batch);
cnt++;
// deliver delivery_batch
} while(counter.addAndGet(-removed_msgs) != 0);
}
}
{code}
Note that counter can temporarily get negative, e.g.
* T1 adds 5 messages
* T2 adds 5 messages
* T1 increments counter from 0 -> 5, enters do-while loop
* T1 removes 10 messages, delivers the batch of 10
* T1 decrements counter by 10: -5
* T1 loops and removes 0 messages
* T2 increments counter from -5 to 0: terminates as prev_value was -5
* T1 hits the while clause which evaluates to true (counter == 0) and terminates
was (Author: belaban):
Better algorithm for a bounded MessageBatch:
{code:java}
protected final MessageBatch batch=new MessageBatch(BATCH_SIZE);
protected final AtomicInteger counter=new AtomicInteger(0);
protected void add(MessageBatch mb) {
int size=add(mb); // adds mb to this.batch
if(size > 0)
drain(size);
}
protected void drain(int num) {
if(counter.getAndAdd(num) == 0) {
final MessageBatch delivery_batch=new MessageBatch(num);
do {
delivery_batch.reset();
removed_msgs=_transfer(delivery_batch);
cnt++;
// deliver delivery_batch
} while(counter.addAndGet(-removed_msgs) != 0);
}
}
{code}
Note that counter can get nagative temporarily, e.g.
* T1 adds 5 messages
* T2 adds 5 messages
* T1 increments counter from 0 -> 5, enters do-while loop
* T1 removes 10 messages, delivers the batch of 10
* T1 decrements counter by 10: -5
* T1 loops and removes 0 messages
* T2 increments counter from -5 to 0: terminates as prev_value was -5
* T1 hits the while clause which evaluates to true (counter == 0) and terminates
More efficient message adding and draining
------------------------------------------
Key: JGRP-2150
URL:
https://issues.jboss.org/browse/JGRP-2150
Project: JGroups
Issue Type: Enhancement
Reporter: Bela Ban
Assignee: Bela Ban
Labels: CR1
Fix For: 4.0
In NAKACK2, UNICAST3 and in MaxOneThreadPerSenderPolicy, we have a pattern where one or
more producers add messages (to a table in NAKACK2 and UNICAST3, or to a MessageBatch in
MaxOneThreadPerSenderPolicy) and then only *a single thread* can remove and deliver
messages up the stack.
This requires synchronization around (1) determining the thread which will remove
messages, (2) adding messages to the table (or batch) and (3) removing messages from the
table or batch.
Unit tests DrainTest and MessageBatchDrainTest show how a simple AtomicInteger can be
used to do this.
--
This message was sent by Atlassian JIRA
(v7.2.3#72005)