[jboss-jira] [JBoss JIRA] (JGRP-1644) NAKACK2 violates FIFO property
Vadim Tsesko (JIRA)
jira-events at lists.jboss.org
Mon Jul 1 10:52:21 EDT 2013
[ https://issues.jboss.org/browse/JGRP-1644?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12786526#comment-12786526 ]
Vadim Tsesko commented on JGRP-1644:
------------------------------------
I tried, but I can't reproduce the issue outside of the production system with real workload and message loss. Concurrent unit tests being run on a local machine pass successfully.
I can try to describe the system architecture.
We have 2 (logical) producers and about 40 (logical) consumers. Each one is a cluster member.
h3. Producers
Producers using JGroups {{Channel}}'s are constructed the following way (Scala + [Akka|http://akka.io], error handling and stats stripped):
{code:java}
/**
* Constructs TCP message publisher routing messages to remote actors
* connected to `topic`
*
* @param ip interface IP address to bind to
* @param port port to bind to
* @param seed comma separated list of seed nodes:
* `localhost[2048],localhost[2049]`
* @param topic message topic
* @param selfName self name for distributed logging
* @return publisher actor
*/
def newTCPPublisher(
ip: String,
port: Int,
seed: String,
topic: String,
selfName: String): ActorRef = {
val channel = new JChannel(TCPStack.config(ip, port, seed))
channel.setName(selfName)
channel.setReceiver(createWatcher(selfName))
channel.connect(topic)
registerChannel(channel)
val publisher = system.actorOf(
Props(new JGroupsPublisher(selfName, channel))
.withDispatcher(PUBLISHER_DISPATCHER),
s"tcp-publisher-$channel")
publisher
}
private def createWatcher(channelName: String): Receiver =
new Receiver {
def viewAccepted(new_view: View) {
log.info(s"New cluster view accepted on <$channelName>: $new_view")
}
def suspect(suspected_mbr: org.jgroups.Address) {
log.warn(
s"Suspecting cluster member from <$channelName>: $suspected_mbr")
}
def block() {
log.debug(s"Blocked on <$channelName>")
}
def unblock() {
log.debug(s"Unblocked on <$channelName>")
}
def receive(msg: Message) {}
def getState(output: OutputStream) {}
def setState(input: InputStream) {}
}
private def registerChannel(channel: JChannel) {
val servers = MBeanServerFactory.findMBeanServer(null)
if (servers != null && servers.size() > 0) {
JmxConfigurator.registerChannel(channel, servers.get(0), null)
}
}
{code}
{{TCPStack.config()}} template is presented above. Actor {{JGroupsPublisher}} is returned to users to be used for message publishing. It is executed on a single thread and here goes the code:
{code:java}
class JGroupsPublisher(
channelName: String,
channel: JChannel) extends InstrumentedActor {
protected def process = {
case msg: Object =>
val m: Option[Message] =
try {
if (channel.isConnected) {
Some(new Message(null, msg))
} else {
None
}
} catch {
case e: Exception =>
None
}
m.map(message => {
try {
channel.send(message)
} catch {
case e: Exception =>
log.error(
e,
s"Could not send message <$message> to channel <$channelName>")
}
})
}
}
{code}
The only user (running on a single thread) before publishing a message assigns it a sequential number, appends own *sender id* and puts it into the queue of {{JGroupsPublisher}} instance:
{code:java}
/** Message to send serialized partition index
*/
case class SendPartitionIndexRequest(
partition: PartitionKey,
payload: Array[Byte]) {
override def toString =
"SendPartitionIndexRequest(partition = %d, payload = %d bytes)"
.format(partition.id, payload.length)
}
/** Sends serialized partition index to consumers.
*
* WARNING! This actor must not be pooled!
*
* @param socket socket to send messages to
* @param senderId sender id to prepend to message being sent (with the message counter)
*/
class PartitionIndexSender(
socket: ActorRef,
senderId: String)
extends InstrumentedActor {
protected def process = {
case SendPartitionIndexRequest(partition, payload) =>
socket ! toZMQMessage(payload)
}
def toZMQMessage(payload: Array[Byte]) =
ZMQMessage(Seq(
Frame(IndexChannel.PARTITION_INDEX_TOPIC),
Frame(senderId),
Frame(counter.getAndIncrement.toString),
Frame(payload)))
}
object PartitionIndexSender {
val counter = new AtomicInteger
}
{code}
{{JGroupsPublisher}} extracts messages from the queue one by one and multicasts them using JGroups {{Channel.send(message)}} in method {{process()}}.
h3. Consumers
TCP consumers are constructed the following way:
{code:java}
/**
* Constructs TCP message subscriber receiving messages from remote actors
* publishing to `topic`
*
* @param ip interface IP address to bind to
* @param port port to bind to
* @param seed comma separated list of seed nodes:
* `localhost[2048],localhost[2049]`
* @param topic message topic
* @param selfName self name for distributed logging
* @param listener listener to route messages to
*/
def newTCPSubscriber(
ip: String,
port: Int,
seed: String,
topic: String,
selfName: String,
listener: ActorRef) {
val channel = new JChannel(TCPStack.config(ip, port, seed))
channel.setName(selfName)
channel.setReceiver(createReceiver(selfName, listener))
channel.connect(topic)
registerChannel(channel)
}
private def createReceiver(
channelName: String,
listener: ActorRef): Receiver =
new Receiver {
def viewAccepted(new_view: View) {
log.info(s"New cluster view accepted from <$channelName>: $new_view")
}
def suspect(suspected_mbr: org.jgroups.Address) {
log.warn(
s"Suspecting cluster member from <$channelName>: $suspected_mbr")
}
def block() {
log.debug(s"Blocked on <$channelName>")
}
def unblock() {
log.debug(s"Unblocked on <$channelName>")
}
def receive(msg: Message) {
listener ! msg.getObject
}
def getState(output: OutputStream) {}
def setState(input: InputStream) {}
}
{code}
A user passes an {{ActorRef}} reference when calling {{newTCPSubscriber()}} method. This actor is being run in a single thread and all the received messages are forwarded to it in-order:
{code:java}
abstract class TopicMessageSubscriber(
topics: Set[String])
extends InstrumentedActor {
def this(topicList: List[String]) = this(topicList.toSet)
protected def monitorSkippedMessages: Boolean = true
protected def bearableMessageLossPerInterval: Int = 1
protected def bearableMessageLossInterval: FiniteDuration = 1.hour
protected def process = {
case msg: ZMQMessage
if topics.contains(msg.firstFrameAsString) =>
val data = TopicMessage(msg)
TopicMessageSubscriber.updateCounter(
data.sender,
msg.firstFrameAsString,
data.counter,
bearableMessageLossPerInterval)
processMessage(data)
}
protected def processMessage(msg: TopicMessage)
}
object TopicMessageSubscriber {
private val log = LoggerFactory.getLogger(this.getClass)
private val counters =
new ConcurrentHashMap[(String, String), AtomicInteger]()
// Stores currentTimeMillis of each message lost
// INVARIANT: a value List always has size bearableMessageLossPerTopicPerHour
private val skipped =
new ConcurrentHashMap[(String, String), List[Long]]()
def updateCounter(
sender: String,
topic: String,
counter: Int,
bearableMessageLossPerTopicPerHour: Int): Int = {
val key = (sender, topic)
if (counters.putIfAbsent(
key,
new AtomicInteger(counter)) == null)
log.info(
"Received the first index message from sender <%s>, topic '%s'"
.format(sender, topic))
val c = counters.get(key)
val previous = c.getAndSet(counter)
val diff = counter - previous - 1
// Registering a message loss
if (diff > 0) {
log.warn(
("Detected message loss from sender <%s>, topic '%s'. " +
"Previous value %d, current value %d.")
.format(sender, topic, previous, counter))
val now = System.currentTimeMillis()
val lostMessages: List[Long] =
(List.fill(diff.min(bearableMessageLossPerTopicPerHour))(now) :::
(if (skipped.contains(key))
skipped.get(key)
else
List.fill(bearableMessageLossPerTopicPerHour)(-1L)))
.take(bearableMessageLossPerTopicPerHour)
skipped.put(key, lostMessages)
}
math.max(diff, 0)
}
}
{code}
Here you can trace the logic of checking each message sequence number.
h3. Behavior
As a result:
* When we use {{NAKACK2}} in {{TCPStack}} we get errors {{Detected message loss from sender...}} about each message reordering approximately once per 20-30 minutes
* When we use {{NAKACK}} in {{TCPStack}} everything works fine
> NAKACK2 violates FIFO property
> ------------------------------
>
> Key: JGRP-1644
> URL: https://issues.jboss.org/browse/JGRP-1644
> Project: JGroups
> Issue Type: Bug
> Affects Versions: 3.3.1
> Environment: Ubuntu 12.04 LTS, kernel 3.2.0-24-generic #39-Ubuntu SMP Mon May 21 16:52:17 UTC 2012 x86_64 x86_64 x86_64 GNU/Linux, Java 1.7.0_21
> Reporter: Vadim Tsesko
> Assignee: Bela Ban
> Fix For: 3.4
>
> Attachments: TCP-NAKACK2.png, UDP-NAKACK2-NAKACK.png
>
>
> In the [documentation documentation|http://www.jgroups.org/manual/html/protlist.html#ReliableMessageTransmission] it is stated that:
> {quote}
> NAKACK provides reliable delivery and FIFO (= First In First Out) properties for messages sent to all nodes in a cluster.
> {quote}
> and
> {quote}
> NAKACK2 was introduced in 3.1 and is a successor to NAKACK (at some point it will replace NAKACK). It has the same properties as NAKACK, but its implementation is faster and uses less memory, plus it creates fewer tasks in the timer.
> {quote}
> I have observed that sometimes multicast messages are received out of order.
> We use the following protocol stack configuration:
> {code:xml}
> <config xmlns="urn:org:jgroups"
> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
> xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/JGroups-3.3.xsd">
> <UDP bind_addr="match-interface:$interface"
> bind_interface="$interface"
> bind_port="$unicastPort"
> ip_ttl="128"
> mcast_addr="$multicastGroup"
> mcast_port="$multicastPort"
> singleton_name="udp-transport"/>
> <PING return_entire_cache="true"
> break_on_coord_rsp="false"/>
> <MERGE3/>
> <FD_SOCK/>
> <FD_ALL/>
> <VERIFY_SUSPECT/>
> <BARRIER/>
> <pbcast.NAKACK print_stability_history_on_failed_xmit="true"/>
> <pbcast.STABLE/>
> <pbcast.GMS/>
> <MFC max_credits="8M"/>
> <FRAG2/>
> <RSVP/>
> </config>
> {code}
> As you can see, mostly we use the defaults.
> The messages are being sent from a single thread using the following code:
> {code:java}
> channel.send(new Message(null, msg))
> {code}
> Each message has size from 300 KB up to 4 MB. The message rate is 1-5 messages per second.
> We have a sequential counter inside each message being sent. Sometimes the messages are received out of order, for instance:
> {code}
> #1198
> #1199
> #1200
> #1202
> #1201
> #1203
> #1204
> {code}
> If we replace {{NAKACK2}} by {{NAKACK}} the problem disappears -- everything works as expected (FIFO).
> If we replace JGroups-based transport by ZeroMQ-based transport (actually running over EPGM and being used for a year) everything works as expected (FIFO) -- just to let you know, that there are no bugs in out message numbering logic.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira
More information about the jboss-jira
mailing list