[
https://issues.jboss.org/browse/JGRP-1426?page=com.atlassian.jira.plugin....
]
David Hotham edited comment on JGRP-1426 at 2/22/12 7:30 AM:
-------------------------------------------------------------
Hi,
I decided that I should write a standalone repro for this. I can't see how to attach
files to this issue so I'm just going to paste the code below. To repro:
- I'm using loopback addresses 10.239.0.1 through 10.239.0.4 on my PC, and these are
hardcoded in. You'll want to tweak to match whatever network you use
- run instances 10.239.0.1 through 10.239.0.3 directly; these will just sit around
- run instance 10.239.0.4 in a wrapper (I'll include the python code that I've
been using) which
- sits waiting for the file AOK.txt to be written
- when it is written, kills instance 10.239.0.4 and starts a new one.
Sooner or later you should see that D gets stuck outside the group, with one of the other
members repeatedly logging "GMS flush by coordinator failed". I usually see it
happen after about 10 or 15 loops.
Here's the java code:
{noformat}
import org.jgroups.*;
import org.jgroups.protocols.*;
import org.jgroups.protocols.pbcast.*;
import org.jgroups.stack.*;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
public class Test extends ReceiverAdapter {
String localAddress;
Boolean sendMessage = false;
public Test(String address) {
localAddress = address;
}
// Parameter is an IP address, expected to be one of 10.239.0.1 - 4.
public static void main(String[] args) throws Exception {
Test test = new Test(args[0]);
test.start();
}
public void start() throws Exception {
// Set up the stack, and channel.
InetAddress inetAddr = InetAddress.getByName(localAddress);
IpAddress member_a = new IpAddress("10.239.0.1", 60000);
IpAddress member_b = new IpAddress("10.239.0.2", 60000);
IpAddress member_c = new IpAddress("10.239.0.3", 60000);
IpAddress member_d = new IpAddress("10.239.0.4", 60000);
List<IpAddress> initialHosts = new ArrayList<IpAddress>();
initialHosts.add(member_a);
initialHosts.add(member_b);
initialHosts.add(member_c);
initialHosts.add(member_d);
JChannel ch=new JChannel(false);
ProtocolStack stack=new ProtocolStack();
ch.setProtocolStack(stack);
ch.setReceiver(this);
stack.addProtocol((new TCP())
.setValue("bind_port", 60000)
.setValue("bind_addr", inetAddr));
TCPPING tcpping = new TCPPING();
tcpping.setInitialHosts(initialHosts);
stack.addProtocol(tcpping)
.addProtocol(new MERGE3())
.addProtocol(new FD())
.addProtocol(new VERIFY_SUSPECT())
.addProtocol((new NAKACK2())
.setValue("use_mcast_xmit", false))
.addProtocol(new UNICAST2())
.addProtocol(new STABLE())
.addProtocol(new GMS())
.addProtocol(new UFC())
.addProtocol(new MFC())
.addProtocol(new FRAG2())
.addProtocol(new SEQUENCER())
.addProtocol(new FLUSH());
for (Protocol protocol: stack.getProtocols()) {
protocol.setProtocolStack(stack);
}
stack.init();
// Connect to the cluster.
ch.setName(localAddress);
ch.connect("TestCluster");
// Loop round, waiting to be told to send a message.
// Not thread safe - we might not send as many messages as we 'should' -
but good enough.
Boolean done = false;
while (!done) {
if (sendMessage) {
sendMessage = false;
ch.send(null, "This is a broadcast message from " +
localAddress);
}
Thread.sleep(100);
}
}
public void viewAccepted(View new_view) {
System.out.println("view: " + new_view);
// When D sees that the view has reached four members it creates the file AOK.txt,
at which point we expect to
// kill it.
if (localAddress.equals("10.239.0.4") && (new_view.size() == 4))
{
File f = new File("/tmp/AOK.txt");
try {
f.createNewFile();
}
catch (IOException e) {
System.out.println("couldn't create AOK.txt");
}
}
// Changes of view trigger message sending.
sendMessage = true;
}
public void receive(Message msg) {
System.out.println("<< " + msg.getObject() + " [" +
msg.getSrc() + "]");
}
}
{noformat}
And the python wrapper:
{noformat}
import os
import random
import subprocess
import time
class TestWrapper:
def __init__(self):
self.subProc = None
def startMemberD(self):
try:
self.subProc = subprocess.Popen("java -jar target/Test.jar
10.239.0.4")
except OSError, e:
print e
def killMemberD(self):
try:
self.subProc.kill()
except:
os.kill(self.subProc.pid)
self.subProc = None
def start(self):
counter = 0
while counter < 1000:
counter += 1
print "Starting member D on loop %d" % counter
self.startMemberD()
# Wait for D to signal that all is well
while not os.path.exists("c:\\downloads\\AOK.txt"):
time.sleep(1)
# Kill D, wait a bit.
os.remove("/tmp/AOK.txt")
time.sleep(2)
self.killMemberD()
delay = random.randrange(0, 10)
print "Sleeping for %d seconds" % delay
time.sleep(delay)
wrapper = TestWrapper()
wrapper.start()
{noformat}
was (Author: dimbleby):
Hi,
I decided that I should write a standalone repro for this. I can't see how to attach
files to this issue so I'm just going to paste the code below. To repro:
- I'm using loopback addresses 10.239.0.1 through 10.239.0.4 on my PC, and these are
hardcoded in. You'll want to tweak to match whatever network you use
- run instances 10.239.0.1 through 10.239.0.3 directly; these will just sit around
- run instance 10.239.0.4 in a wrapper (I'll include the python code that I've
been using) which
- sits waiting for the file AOK.txt to be written
- when it is written, kills instance 10.239.0.4 and starts a new one.
Sooner or later you should see that D gets stuck outside the group, with one of the other
members repeatedly logging "GMS flush by coordinator failed". I usually see it
happen after about 10 or 15 loops.
Here's the java code:
{noformat}
import org.jgroups.*;
import org.jgroups.protocols.*;
import org.jgroups.protocols.pbcast.*;
import org.jgroups.stack.*;
import java.io.File;
import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.List;
public class Test extends ReceiverAdapter {
String localAddress;
Boolean sendMessage = false;
public Test(String address) {
localAddress = address;
}
// Parameter is an IP address, expected to be one of 10.239.0.1 - 4.
public static void main(String[] args) throws Exception {
Test test = new Test(args[0]);
test.start();
}
public void start() throws Exception {
// Set up the stack, and channel.
InetAddress inetAddr = InetAddress.getByName(localAddress);
IpAddress member_a = new IpAddress("10.239.0.1", 60000);
IpAddress member_b = new IpAddress("10.239.0.2", 60000);
IpAddress member_c = new IpAddress("10.239.0.3", 60000);
IpAddress member_d = new IpAddress("10.239.0.4", 60000);
List<IpAddress> initialHosts = new ArrayList<IpAddress>();
initialHosts.add(member_a);
initialHosts.add(member_b);
initialHosts.add(member_c);
initialHosts.add(member_d);
JChannel ch=new JChannel(false);
ProtocolStack stack=new ProtocolStack();
ch.setProtocolStack(stack);
ch.setReceiver(this);
stack.addProtocol((new TCP())
.setValue("bind_port", 60000)
.setValue("bind_addr", inetAddr));
TCPPING tcpping = new TCPPING();
tcpping.setInitialHosts(initialHosts);
stack.addProtocol(tcpping)
.addProtocol(new MERGE3())
.addProtocol(new FD())
.addProtocol(new VERIFY_SUSPECT())
.addProtocol((new NAKACK2())
.setValue("use_mcast_xmit", false))
.addProtocol(new UNICAST2())
.addProtocol(new STABLE())
.addProtocol(new GMS())
.addProtocol(new UFC())
.addProtocol(new MFC())
.addProtocol(new FRAG2())
.addProtocol(new SEQUENCER())
.addProtocol(new FLUSH());
for (Protocol protocol: stack.getProtocols()) {
protocol.setProtocolStack(stack);
}
stack.init();
// Connect to the cluster.
ch.setName(localAddress);
ch.connect("TestCluster");
// Loop round, waiting to be told to send a message.
// Not thread safe - we might not send as many messages as we 'should' -
but good enough.
Boolean done = false;
while (!done) {
if (sendMessage) {
sendMessage = false;
ch.send(null, "This is a broadcast message from " +
localAddress);
}
Thread.sleep(100);
}
}
public void viewAccepted(View new_view) {
System.out.println("view: " + new_view);
// When D sees that the view has reached four members it creates the file AOK.txt,
at which point we expect to
// kill it.
if (localAddress.equals("10.239.0.4") && (new_view.size() == 4))
{
File f = new File("/tmp/AOK.txt");
try {
f.createNewFile();
}
catch (IOException e) {
System.out.println("couldn't create AOK.txt");
}
}
// Changes of view trigger message sending.
sendMessage = true;
}
public void receive(Message msg) {
System.out.println("<< " + msg.getObject() + " [" +
msg.getSrc() + "]");
}
}
And the python wrapper:
import os
import random
import subprocess
import time
class TestWrapper:
def __init__(self):
self.subProc = None
def startMemberD(self):
try:
self.subProc = subprocess.Popen("java -jar target/Test.jar
10.239.0.4")
except OSError, e:
print e
def killMemberD(self):
try:
self.subProc.kill()
except:
os.kill(self.subProc.pid)
self.subProc = None
def start(self):
counter = 0
while counter < 1000:
counter += 1
print "Starting member D on loop %d" % counter
self.startMemberD()
# Wait for D to signal that all is well
while not os.path.exists("c:\\downloads\\AOK.txt"):
time.sleep(1)
# Kill D, wait a bit.
os.remove("/tmp/AOK.txt")
time.sleep(2)
self.killMemberD()
delay = random.randrange(0, 10)
print "Sleeping for %d seconds" % delay
time.sleep(delay)
wrapper = TestWrapper()
wrapper.start()
{noformat}
Group unable to accept new members: FLUSH stuck after MERGE_RSP timed
out
-------------------------------------------------------------------------
Key: JGRP-1426
URL:
https://issues.jboss.org/browse/JGRP-1426
Project: JGroups
Issue Type: Bug
Affects Versions: 3.1
Reporter: David Hotham
Assignee: Vladimir Blagojevic
Fix For: 3.1
We have two sub-groups, [B, C, A] and [D].
(1) D sends a MERGE_REQ to B.
2012-02-18 22:15:03.888 [MergeTask,Clumpy Test Cluster,Member-D] TRACE
org.jgroups.protocols.pbcast.Merger - Member-B: sending MERGE_REQ to [Member-D, Member-B]
(2) B receives this. There's some delay in processing it (I think because
there's another merge or flush going on; but the exact reason doesn't matter for
this issue). When processing does start, B begins a flush.
2012-02-18 22:15:03.889 [OOB-2,Clumpy Test Cluster,Member-B] TRACE
org.jgroups.protocols.TCP - received [dst: Member-B, src: Member-D (3 headers), size=0
bytes, flags=OOB], headers are GMS: GmsHeader[MERGE_REQ]: merge_id=Member-D::1,
mbrs=[Member-B, Member-C, Member-A, Member-D], UNICAST2: DATA, seqno=1, conn_id=2, first,
TCP: [channel_name=Clumpy Test Cluster]
2012-02-18 22:15:08.811 [OOB-2,Clumpy Test Cluster,Member-B] TRACE
org.jgroups.protocols.pbcast.Merger - Member-B: got merge request from Member-D,
merge_id=Member-D::1, mbrs=[Member-B, Member-A, Member-C, Member-D]
(3) D times out waiting for the MERGE_RSP from B:
2012-02-18 22:15:08.889 [MergeTask,Clumpy Test Cluster,Member-D] TRACE
org.jgroups.protocols.pbcast.Merger - Member-D: collected 1 merge response(s) in 5001 ms
2012-02-18 22:15:08.889 [MergeTask,Clumpy Test Cluster,Member-D] DEBUG
org.jgroups.protocols.pbcast.Merger - merge leader Member-D did not get responses from all
2 partition coordinators; missing responses from 1 members, removing them from the merge
(4) D completes the (failed) merge and broadcasts STOP_FLUSH:
2012-02-18 22:15:08.896 [MergeTask,Clumpy Test Cluster,Member-D] TRACE
org.jgroups.protocols.pbcast.Merger - Member-D: received all ACKs (1) for merge view
MergeView::[Member-D|1] [Member-D], subgroups=[Member-D|0] [Member-D] in 7ms
2012-02-18 22:15:08.896 [MergeTask,Clumpy Test Cluster,Member-D] TRACE
org.jgroups.protocols.pbcast.GMS - Member-D: sending RESUME event
2012-02-18 22:15:08.897 [MergeTask,Clumpy Test Cluster,Member-D] DEBUG
org.jgroups.protocols.pbcast.FLUSH - Member-D: received RESUME, sending STOP_FLUSH to all
But, since B is not a member of D's view, B does not receive this message.
(5) Now all future merge attempts fail, because B is stuck in a flush:
2012-02-18 22:15:31.186 [OOB-1,Clumpy Test Cluster,Member-B] WARN
org.jgroups.protocols.pbcast.GMS - Member-B: GMS flush by coordinator failed
2012-02-18 22:15:54.380 [OOB-2,Clumpy Test Cluster,Member-B] WARN
org.jgroups.protocols.pbcast.GMS - Member-B: GMS flush by coordinator failed
2012-02-18 22:16:13.705 [OOB-1,Clumpy Test Cluster,Member-B] WARN
org.jgroups.protocols.pbcast.GMS - Member-B: GMS flush by coordinator failed
Note that I have implemented a workaround in my application where I:
- start a long-ish timer in the block() callback; and stop that timer in unblock()
- if the timer is allowed to pop, call channel.stopFlush()
This seems to be allowing the group to recover. Any comments on whether this is a good
or bad idea would be appreciated.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators:
https://issues.jboss.org/secure/ContactAdministrators!default.jspa
For more information on JIRA, see:
http://www.atlassian.com/software/jira