[jboss-jira] [JBoss JIRA] (JGRP-1426) Group unable to accept new members: FLUSH stuck after MERGE_RSP timed out

David Hotham (JIRA) jira-events at lists.jboss.org
Wed Feb 22 07:30:39 EST 2012


    [ https://issues.jboss.org/browse/JGRP-1426?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12669905#comment-12669905 ] 

David Hotham edited comment on JGRP-1426 at 2/22/12 7:30 AM:
-------------------------------------------------------------

Hi, 

I decided that I should write a standalone repro for this.  I can't see how to attach files to this issue so I'm just going to paste the code below.  To repro:

-  I'm using loopback addresses 10.239.0.1 through 10.239.0.4 on my PC, and these are hardcoded in.  You'll want to tweak to match whatever network you use
-  run instances 10.239.0.1 through 10.239.0.3 directly; these will just sit around
-  run instance 10.239.0.4 in a wrapper (I'll include the python code that I've been using) which
   -  sits waiting for the file AOK.txt to be written
   -  when it is written, kills instance 10.239.0.4 and starts a new one.

Sooner or later you should see that D gets stuck outside the group, with one of the other members repeatedly logging "GMS flush by coordinator failed".  I usually see it happen after about 10 or 15 loops.

Here's the java code:

{noformat}
import org.jgroups.*;
import org.jgroups.protocols.*;
import org.jgroups.protocols.pbcast.*;
import org.jgroups.stack.*;

import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;

public class Test extends ReceiverAdapter {
    String localAddress;
    Boolean sendMessage = false;

    public Test(String address) {
        localAddress =  address;
    }

    // Parameter is an IP address, expected to be one of 10.239.0.1 - 4.
    public static void main(String[] args) throws Exception {
        Test test = new Test(args[0]);
        test.start();
    }

    public void start() throws Exception {
        // Set up the stack, and channel.
        InetAddress inetAddr = InetAddress.getByName(localAddress);
        IpAddress member_a = new IpAddress("10.239.0.1", 60000);
        IpAddress member_b = new IpAddress("10.239.0.2", 60000);
        IpAddress member_c = new IpAddress("10.239.0.3", 60000);
        IpAddress member_d = new IpAddress("10.239.0.4", 60000);
        List<IpAddress> initialHosts = new ArrayList<IpAddress>();
        initialHosts.add(member_a);
        initialHosts.add(member_b);
        initialHosts.add(member_c);
        initialHosts.add(member_d);

        JChannel ch=new JChannel(false);
        ProtocolStack stack=new ProtocolStack();

        ch.setProtocolStack(stack);
        ch.setReceiver(this);

        stack.addProtocol((new TCP())
                .setValue("bind_port", 60000)
                .setValue("bind_addr", inetAddr));

        TCPPING tcpping = new TCPPING();
        tcpping.setInitialHosts(initialHosts);

        stack.addProtocol(tcpping)
                .addProtocol(new MERGE3())
                .addProtocol(new FD())
                .addProtocol(new VERIFY_SUSPECT())
                .addProtocol((new NAKACK2())
                        .setValue("use_mcast_xmit", false))
                .addProtocol(new UNICAST2())
                .addProtocol(new STABLE())
                .addProtocol(new GMS())
                .addProtocol(new UFC())
                .addProtocol(new MFC())
                .addProtocol(new FRAG2())
                .addProtocol(new SEQUENCER())
                .addProtocol(new FLUSH());
        for (Protocol protocol: stack.getProtocols()) {
            protocol.setProtocolStack(stack);
        }
        stack.init();

        // Connect to the cluster.
        ch.setName(localAddress);
        ch.connect("TestCluster");

        // Loop round, waiting to be told to send a message.
        // Not thread safe - we might not send as many messages as we 'should' - but good enough.
        Boolean done = false;
        while (!done) {
            if (sendMessage) {
                sendMessage = false;
                ch.send(null, "This is a broadcast message from " + localAddress);
            }
            Thread.sleep(100);
        }
    }

    public void viewAccepted(View new_view) {
        System.out.println("view: " + new_view);
        
        // When D sees that the view has reached four members it creates the file AOK.txt, at which point we expect to
        // kill it.
        if (localAddress.equals("10.239.0.4") && (new_view.size() == 4)) {
            File f = new File("/tmp/AOK.txt");
            try {
                f.createNewFile();
            }
            catch (IOException e) {
                System.out.println("couldn't create AOK.txt");
            }
        }

        // Changes of view trigger message sending.
        sendMessage = true;
    }

    public void receive(Message msg) {
        System.out.println("<< " + msg.getObject() + " [" + msg.getSrc() + "]");
    }
}
{noformat}

And the python wrapper:

{noformat}
import os
import random
import subprocess
import time

class TestWrapper:
    def __init__(self):
        self.subProc = None

    def startMemberD(self):
        try:
            self.subProc = subprocess.Popen("java -jar target/Test.jar 10.239.0.4")
        except OSError, e:
            print e

    def killMemberD(self):
        try:
            self.subProc.kill()
        except:
                os.kill(self.subProc.pid)
        self.subProc = None

    def start(self):
        counter = 0
        while counter < 1000:
            counter += 1
            print "Starting member D on loop %d" % counter
            self.startMemberD()

            # Wait for D to signal that all is well
            while not os.path.exists("c:\\downloads\\AOK.txt"):
               time.sleep(1)

            # Kill D, wait a bit.
            os.remove("/tmp/AOK.txt")
            time.sleep(2)
            self.killMemberD()
            delay = random.randrange(0, 10)
            print "Sleeping for %d seconds" % delay
            time.sleep(delay)

wrapper = TestWrapper()
wrapper.start()
{noformat}
                
      was (Author: dimbleby):
    Hi, 

I decided that I should write a standalone repro for this.  I can't see how to attach files to this issue so I'm just going to paste the code below.  To repro:

-  I'm using loopback addresses 10.239.0.1 through 10.239.0.4 on my PC, and these are hardcoded in.  You'll want to tweak to match whatever network you use
-  run instances 10.239.0.1 through 10.239.0.3 directly; these will just sit around
-  run instance 10.239.0.4 in a wrapper (I'll include the python code that I've been using) which
   -  sits waiting for the file AOK.txt to be written
   -  when it is written, kills instance 10.239.0.4 and starts a new one.

Sooner or later you should see that D gets stuck outside the group, with one of the other members repeatedly logging "GMS flush by coordinator failed".  I usually see it happen after about 10 or 15 loops.

Here's the java code:

{noformat}
import org.jgroups.*;
import org.jgroups.protocols.*;
import org.jgroups.protocols.pbcast.*;
import org.jgroups.stack.*;

import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;

public class Test extends ReceiverAdapter {
    String localAddress;
    Boolean sendMessage = false;

    public Test(String address) {
        localAddress =  address;
    }

    // Parameter is an IP address, expected to be one of 10.239.0.1 - 4.
    public static void main(String[] args) throws Exception {
        Test test = new Test(args[0]);
        test.start();
    }

    public void start() throws Exception {
        // Set up the stack, and channel.
        InetAddress inetAddr = InetAddress.getByName(localAddress);
        IpAddress member_a = new IpAddress("10.239.0.1", 60000);
        IpAddress member_b = new IpAddress("10.239.0.2", 60000);
        IpAddress member_c = new IpAddress("10.239.0.3", 60000);
        IpAddress member_d = new IpAddress("10.239.0.4", 60000);
        List<IpAddress> initialHosts = new ArrayList<IpAddress>();
        initialHosts.add(member_a);
        initialHosts.add(member_b);
        initialHosts.add(member_c);
        initialHosts.add(member_d);

        JChannel ch=new JChannel(false);
        ProtocolStack stack=new ProtocolStack();

        ch.setProtocolStack(stack);
        ch.setReceiver(this);

        stack.addProtocol((new TCP())
                .setValue("bind_port", 60000)
                .setValue("bind_addr", inetAddr));

        TCPPING tcpping = new TCPPING();
        tcpping.setInitialHosts(initialHosts);

        stack.addProtocol(tcpping)
                .addProtocol(new MERGE3())
                .addProtocol(new FD())
                .addProtocol(new VERIFY_SUSPECT())
                .addProtocol((new NAKACK2())
                        .setValue("use_mcast_xmit", false))
                .addProtocol(new UNICAST2())
                .addProtocol(new STABLE())
                .addProtocol(new GMS())
                .addProtocol(new UFC())
                .addProtocol(new MFC())
                .addProtocol(new FRAG2())
                .addProtocol(new SEQUENCER())
                .addProtocol(new FLUSH());
        for (Protocol protocol: stack.getProtocols()) {
            protocol.setProtocolStack(stack);
        }
        stack.init();

        // Connect to the cluster.
        ch.setName(localAddress);
        ch.connect("TestCluster");

        // Loop round, waiting to be told to send a message.
        // Not thread safe - we might not send as many messages as we 'should' - but good enough.
        Boolean done = false;
        while (!done) {
            if (sendMessage) {
                sendMessage = false;
                ch.send(null, "This is a broadcast message from " + localAddress);
            }
            Thread.sleep(100);
        }
    }

    public void viewAccepted(View new_view) {
        System.out.println("view: " + new_view);
        
        // When D sees that the view has reached four members it creates the file AOK.txt, at which point we expect to
        // kill it.
        if (localAddress.equals("10.239.0.4") && (new_view.size() == 4)) {
            File f = new File("/tmp/AOK.txt");
            try {
                f.createNewFile();
            }
            catch (IOException e) {
                System.out.println("couldn't create AOK.txt");
            }
        }

        // Changes of view trigger message sending.
        sendMessage = true;
    }

    public void receive(Message msg) {
        System.out.println("<< " + msg.getObject() + " [" + msg.getSrc() + "]");
    }
}




And the python wrapper:

import os
import random
import subprocess
import time

class TestWrapper:
    def __init__(self):
        self.subProc = None

    def startMemberD(self):
        try:
            self.subProc = subprocess.Popen("java -jar target/Test.jar 10.239.0.4")
        except OSError, e:
            print e

    def killMemberD(self):
        try:
            self.subProc.kill()
        except:
                os.kill(self.subProc.pid)
        self.subProc = None

    def start(self):
        counter = 0
        while counter < 1000:
            counter += 1
            print "Starting member D on loop %d" % counter
            self.startMemberD()

            # Wait for D to signal that all is well
            while not os.path.exists("c:\\downloads\\AOK.txt"):
               time.sleep(1)

            # Kill D, wait a bit.
            os.remove("/tmp/AOK.txt")
            time.sleep(2)
            self.killMemberD()
            delay = random.randrange(0, 10)
            print "Sleeping for %d seconds" % delay
            time.sleep(delay)

wrapper = TestWrapper()
wrapper.start()
{noformat}
                  
> Group unable to accept new members: FLUSH stuck after MERGE_RSP timed out
> -------------------------------------------------------------------------
>
>                 Key: JGRP-1426
>                 URL: https://issues.jboss.org/browse/JGRP-1426
>             Project: JGroups
>          Issue Type: Bug
>    Affects Versions: 3.1
>            Reporter: David Hotham
>            Assignee: Vladimir Blagojevic
>             Fix For: 3.1
>
>
> We have two sub-groups, [B, C, A] and [D].
> (1) D sends a MERGE_REQ to B.
> 2012-02-18 22:15:03.888 [MergeTask,Clumpy Test Cluster,Member-D] TRACE org.jgroups.protocols.pbcast.Merger - Member-B: sending MERGE_REQ to [Member-D, Member-B]
> (2) B receives this.  There's some delay in processing it (I think because there's another merge or flush going on; but the exact reason doesn't matter for this issue).  When processing does start, B begins a flush.
> 2012-02-18 22:15:03.889 [OOB-2,Clumpy Test Cluster,Member-B] TRACE org.jgroups.protocols.TCP - received [dst: Member-B, src: Member-D (3 headers), size=0 bytes, flags=OOB], headers are GMS: GmsHeader[MERGE_REQ]: merge_id=Member-D::1, mbrs=[Member-B, Member-C, Member-A, Member-D], UNICAST2: DATA, seqno=1, conn_id=2, first, TCP: [channel_name=Clumpy Test Cluster]
> 2012-02-18 22:15:08.811 [OOB-2,Clumpy Test Cluster,Member-B] TRACE org.jgroups.protocols.pbcast.Merger - Member-B: got merge request from Member-D, merge_id=Member-D::1, mbrs=[Member-B, Member-A, Member-C, Member-D]
> (3) D times out waiting for the MERGE_RSP from B:
> 2012-02-18 22:15:08.889 [MergeTask,Clumpy Test Cluster,Member-D] TRACE org.jgroups.protocols.pbcast.Merger - Member-D: collected 1 merge response(s) in 5001 ms
> 2012-02-18 22:15:08.889 [MergeTask,Clumpy Test Cluster,Member-D] DEBUG org.jgroups.protocols.pbcast.Merger - merge leader Member-D did not get responses from all 2 partition coordinators; missing responses from 1 members, removing them from the merge
> (4) D completes the (failed) merge and broadcasts STOP_FLUSH:
> 2012-02-18 22:15:08.896 [MergeTask,Clumpy Test Cluster,Member-D] TRACE org.jgroups.protocols.pbcast.Merger - Member-D: received all ACKs (1) for merge view MergeView::[Member-D|1] [Member-D], subgroups=[Member-D|0] [Member-D] in 7ms
> 2012-02-18 22:15:08.896 [MergeTask,Clumpy Test Cluster,Member-D] TRACE org.jgroups.protocols.pbcast.GMS - Member-D: sending RESUME event
> 2012-02-18 22:15:08.897 [MergeTask,Clumpy Test Cluster,Member-D] DEBUG org.jgroups.protocols.pbcast.FLUSH - Member-D: received RESUME, sending STOP_FLUSH to all
> But, since B is not a member of D's view, B does not receive this message.  
> (5) Now all future merge attempts fail, because B is stuck in a flush:
> 2012-02-18 22:15:31.186 [OOB-1,Clumpy Test Cluster,Member-B] WARN  org.jgroups.protocols.pbcast.GMS - Member-B: GMS flush by coordinator failed
> 2012-02-18 22:15:54.380 [OOB-2,Clumpy Test Cluster,Member-B] WARN  org.jgroups.protocols.pbcast.GMS - Member-B: GMS flush by coordinator failed
> 2012-02-18 22:16:13.705 [OOB-1,Clumpy Test Cluster,Member-B] WARN  org.jgroups.protocols.pbcast.GMS - Member-B: GMS flush by coordinator failed
> Note that I have implemented a workaround in my application where I:
> -  start a long-ish timer in the block() callback; and stop that timer in unblock()
> -  if the timer is allowed to pop, call channel.stopFlush()
> This seems to be allowing the group to recover.  Any comments on whether this is a good or bad idea would be appreciated.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.jboss.org/secure/ContactAdministrators!default.jspa
For more information on JIRA, see: http://www.atlassian.com/software/jira

        


More information about the jboss-jira mailing list