[jboss-jira] [JBoss JIRA] (JGRP-2150) More efficient message adding and draining

Bela Ban (JIRA) issues at jboss.org
Thu Jan 5 11:37:02 EST 2017


    [ https://issues.jboss.org/browse/JGRP-2150?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13343852#comment-13343852 ] 

Bela Ban edited comment on JGRP-2150 at 1/5/17 11:36 AM:
---------------------------------------------------------

Better algorithm for a bounded MessageBatch:

{code:java}
protected final MessageBatch   batch=new MessageBatch(BATCH_SIZE);
protected final AtomicInteger  counter=new AtomicInteger(0);

   protected void add(MessageBatch mb) {
        int size=add(mb);  // adds mb to this.batch
        if(size > 0)
            drain(size);
    }
 
    protected void drain(int num) {
        if(counter.getAndAdd(num) == 0) {
            final MessageBatch delivery_batch=new MessageBatch(num);
            do {
                delivery_batch.reset();
                removed_msgs=_transfer(delivery_batch);
                cnt++;
                //   deliver delivery_batch
            } while(counter.addAndGet(-removed_msgs) != 0);
        }
    }
{code}

Note that counter can temporarily get negative, e.g. 
* T1 adds 5 messages
* T2 adds 5 messages
* T1 increments counter from 0 -> 5, enters do-while loop
* T1 removes 10 messages, delivers the batch of 10
* T1 decrements counter by 10: -5
* T1 loops and removes 0 messages
* T2 increments counter from -5 to 0: terminates as prev_value was -5
* T1 hits the while clause which evaluates to true (counter == 0) and terminates


was (Author: belaban):
Better algorithm for a bounded MessageBatch:

{code:java}
protected final MessageBatch   batch=new MessageBatch(BATCH_SIZE);
protected final AtomicInteger  counter=new AtomicInteger(0);

   protected void add(MessageBatch mb) {
        int size=add(mb);  // adds mb to this.batch
        if(size > 0)
            drain(size);
    }
 
    protected void drain(int num) {
        if(counter.getAndAdd(num) == 0) {
            final MessageBatch delivery_batch=new MessageBatch(num);
            do {
                delivery_batch.reset();
                removed_msgs=_transfer(delivery_batch);
                cnt++;
                //   deliver delivery_batch
            } while(counter.addAndGet(-removed_msgs) != 0);
        }
    }
{code}

Note that counter can get nagative temporarily, e.g. 
* T1 adds 5 messages
* T2 adds 5 messages
* T1 increments counter from 0 -> 5, enters do-while loop
* T1 removes 10 messages, delivers the batch of 10
* T1 decrements counter by 10: -5
* T1 loops and removes 0 messages
* T2 increments counter from -5 to 0: terminates as prev_value was -5
* T1 hits the while clause which evaluates to true (counter == 0) and terminates

> More efficient message adding and draining
> ------------------------------------------
>
>                 Key: JGRP-2150
>                 URL: https://issues.jboss.org/browse/JGRP-2150
>             Project: JGroups
>          Issue Type: Enhancement
>            Reporter: Bela Ban
>            Assignee: Bela Ban
>              Labels: CR1
>             Fix For: 4.0
>
>
> In NAKACK2, UNICAST3 and in MaxOneThreadPerSenderPolicy, we have a pattern where one or more producers add messages (to a table in NAKACK2 and UNICAST3, or to a MessageBatch in MaxOneThreadPerSenderPolicy) and then only *a single thread* can remove and deliver messages up the stack.
> This requires synchronization around (1) determining the thread which will remove messages, (2) adding messages to the table (or batch) and (3) removing messages from the table or batch.
> Unit tests DrainTest and MessageBatchDrainTest show how a simple AtomicInteger can be used to do this.



--
This message was sent by Atlassian JIRA
(v7.2.3#72005)


More information about the jboss-jira mailing list