[infinispan-dev] Infinispan and change data capture

Radim Vansa rvansa at redhat.com
Fri Dec 9 16:43:15 EST 2016


On 12/09/2016 06:25 PM, Emmanuel Bernard wrote:
> Randall and I had a chat on $subject. Here is a proposal worth
> exploring as it is very lightweight on Infinispan's code.
>
> Does an operation has a unique id sahred by the master and replicas?
> If not could we add that?

Yes, each modification has unique CommandInvocationId in non-tx cache, 
and there are GlobalTransaction ids in tx-caches.

>
> The proposal itself:
>
> The total order would not be global but per key.
> Each node has a Debezium connector instance embedded that listens to the
> operations happening (primary and replicas alike).
> All of this process is happening async compared to the operation.
> Per key, a log of operations is kept in memory (it contains the key, the
> operation, the operation unique id and a ack status.
> If on the key owner, the operation is written by the Debezium connector
> to Kafka when it has been acked (whatever that means is where I'm less
> knowledgable - too many bi-cache, tri-cache and quadri latency mixed in
> my brain).

And the ack is what you have to define. All Infinispan gives you is

operation was confirmed on originator => all owners (primary + backups) 
have stored the value

If you send the ack from originator to primary, it could be lost (when 
originator crashes).
If you write Kafka on originator, you don't have any order, and the 
update could be lost by crashing before somehow replicating to Kafka.
If you write Kafka on primary, you need the ack from all backups (minor 
technical difficulty), and if primary crashes after it has sent the 
update to all backups, data is effectively modified but Kafka is not. 
The originator has to detect primary crashing to retry - so probably the 
primary could only send the ack to originator after it gets ack from all 
backups AND updates Kafka. But this is exactly what Triangle eliminated. 
And you still have the problem when originator crashes as well, but at 
least you're resilient to single node (primary) failure.

So you probably intend to forget any "acks" and as soon as primary 
executes the write locally, just push it to Kafka. No matter the actual 
"outcome" of the operation. E.g. with putIfAbsent there could be 
topology change during replication to backup, which will cause the 
operation to be retried (from the originator). In the meantime, there 
would be another write, and the retried putIfAbsent will fail. You will 
have one successful and one unsuccessful putIfAbsent in the log, with 
the same ID.

> On a replica, the kafka partition is read regularly to clear the
> in-memory log from operations stored in Kafka
> If the replica becomes the owner, it reads the kafka partition to see
> what operations are already in and writes the missing ones.

Backup owner (becoming primary owner) having the write locally logged 
does not mean that operation was successfully finished on all owners.

>
> There are a few cool things:
> - few to no change in what Infinispan does
> - no global ordering simplifies things and frankly is fine for most
>    Debezium cases. In the end a global order could be defined after the
>    fact (by not partitioning for example). But that's a pure downstream
>    concern.
> - everything is async compared to the Infinispan ops
> - the in-memory log can remain in memory as it is protected by replicas
> - the in-memory log is self cleaning thanks to the state in Kafka
>
> Everyone wins. But it does require some sort of globally unique id per
> operation to dedup.

And a suitable definition for Debezium if the operation "happened" or not.

Radim

>
> Emmanuel
>
>
> On Fri 16-12-09 10:08, Randall Hauch wrote:
>>> On Dec 9, 2016, at 3:13 AM, Radim Vansa <rvansa at redhat.com> wrote:
>>>
>>> On 12/08/2016 10:13 AM, Gustavo Fernandes wrote:
>>>> I recently updated a proposal [1] based on several discussions we had
>>>> in the past that is essentially about introducing an event storage
>>>> mechanism (write ahead log) in order to improve reliability, failover
>>>> and "replayability" for the remote listeners, any feedback greatly
>>>> appreciated.
>>> Hi Gustavo,
>>>
>>> while I really like the pull-style architecture and reliable events, I
>>> see some problematic parts here:
>>>
>>> 1) 'cache that would persist the events with a monotonically increasing id'
>>>
>>> I assume that you mean globally (for all entries) monotonous. How will
>>> you obtain such ID? Currently, commands have unique IDs that are
>>> <Address, Long> where the number part is monotonous per node. That's
>>> easy to achieve. But introducing globally monotonous counter means that
>>> there will be a single contention point. (you can introduce another
>>> contention points by adding backups, but this is probably unnecessary as
>>> you can find out the last id from the indexed cache data). Per-segment
>>> monotonous would be probably more scalabe, though that increases complexity.
>> It is complicated, but one way to do this is to have one “primary” node maintain the log and to have other replicate from it. The cluster does need to use consensus to agree which is the primary, and to know which secondary becomes the primary if the primary is failing. Consensus is not trivial, but JGroups Raft (http://belaban.github.io/jgroups-raft/ <http://belaban.github.io/jgroups-raft/>) may be an option. However, this approach ensures that the replica logs are identical to the primary since they are simply recording the primary’s log as-is. Of course, another challenge is what happens during a failure of the primary log node, and can any transactions be performed/completed while the primary is unavailable.
>>
>> Another option is to have each node maintain their own log, and to have an aggregator log that merges/combines the various logs into one. Not sure how feasible it is to merge logs by getting rid of duplicates and determining a total order, but if it is then it may have better fault tolerance characteristics.
>>
>> Of course, it is possible to have node-specific monotonic IDs. For example, MySQL Global Transaction IDs (GTIDs) use a unique UUID for each node, and then GTIDs consists of the node’s UUID plus a monotonically-increasing value (e.g., “31fc48cd-ecd4-46ad-b0a9-f515fc9497c4:1001”). The transaction log contains a mix of GTIDs, and MySQL replication uses a “GTID set” to describe the ranges of transactions known by a server (e.g., “u1:1-100,u2:1-10000,u3:3-5” where “u1”, “u2”, and “u3” are actually UUIDs). So, when a MySQL replica connects, it says “I know about this GTID set", and this tells the master where that client wants to start reading.
>>
>>> 2) 'The write to the event log would be async in order to not affect
>>> normal data writes'
>>>
>>> Who should write to the cache?
>>> a) originator - what if originator crashes (despite the change has been
>>> added)? Besides, originator would have to do (async) RPC to primary
>>> owner (which will be the primary owner of the event, too).
>>> b) primary owner - with triangle, primary does not really know if the
>>> change has been written on backup. Piggybacking that info won't be
>>> trivial - we don't want to send another message explicitly. But even if
>>> we get the confirmation, since the write to event cache is async, if the
>>> primary owner crashes before replicating the event to backup, we lost
>>> the event
>>> c) all owners, but locally - that will require more complex
>>> reconciliation if the event did really happen on all surviving nodes or
>>> not. And backups could have some trouble to resolve order, too.
>>>
>>> IIUC clustered listeners are called from primary owner before the change
>>> is really confirmed on backups (@Pedro correct me if I am wrong,
>>> please), but for this reliable event cache you need higher level of
>>> consistency.
>> This could be handled by writing a confirmation or “commit” event to the log when the write is confirmed or the transaction is committed. Then, only those confirmed events/transactions would be exposed to client listeners. This requires some buffering, but this could be done in each HotRod client.
>>
>>> 3) The log will also have to filter out retried operations (based on
>>> command ID - though this can be indexed, too). Though, I would prefer to
>>> see per-event command-id log to deal with retries properly.
>> IIUC, a “commit” event would work here, too.
>>
>>> 4) Client should pull data, but I would keep push notifications that
>>> 'something happened' (throttled on server). There could be use case for
>>> rarely updated caches, and polling the servers would be excessive there.
>> IMO the clients should poll, but if the server has nothing to return it blocks until there is something or until a timeout occurs. This makes it easy for clients and actually reduces network traffic compared to constantly polling.
>>
>> BTW, a lot of this is replicating the functionality of Kafka, which is already quite mature and feature rich. It’s actually possible to *embed* Kafka to simplify operations, but I don’t think that’s recommended. And, it introduces a very complex codebase that would need to be supported.
>>
>>> Radim
>>>
>>>>
>>>> [1]
>>>> https://github.com/infinispan/infinispan/wiki/Remote-Listeners-improvement-proposal
>>>>
>>>> Thanks,
>>>> Gustavo
>>>>
>>>>
>>>>
>>>> _______________________________________________
>>>> infinispan-dev mailing list
>>>> infinispan-dev at lists.jboss.org
>>>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>>>
>>> --
>>> Radim Vansa <rvansa at redhat.com>
>>> JBoss Performance Team
>>>
>>> _______________________________________________
>>> infinispan-dev mailing list
>>> infinispan-dev at lists.jboss.org
>>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>> _______________________________________________
>> infinispan-dev mailing list
>> infinispan-dev at lists.jboss.org
>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
> _______________________________________________
> infinispan-dev mailing list
> infinispan-dev at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/infinispan-dev


-- 
Radim Vansa <rvansa at redhat.com>
JBoss Performance Team



More information about the infinispan-dev mailing list