[
https://issues.jboss.org/browse/ISPN-1586?page=com.atlassian.jira.plugin....
]
Dan Berindei updated ISPN-1586:
-------------------------------
Description:
I rerun my test (an embedded ISPN cluser) with ISPN 5.0.0. final and 5.1 Sanpshot code.
It is configured in "replication", using local cache store, and preload=true,
purgeOnStartup=false .. (see the whole config below).
I will get the inconsistent data among the nodes in the following scenario:
1) start 2 node cluster
2) after the cluster is formed, add some data to the cache
k1-->v1
k2-->v2
I will see the data replication working perfectly at this point.
3) bring node 2 down
4) delete entry k1-->v1 through node1
Note: At this point, on the local (persistent) cache store on the node2 have 2 entries.
5) start node2, and wait to join the cluster
6) after state merging, you will see now that node1 has 1 entry and nod2 has 2 entries.
I am expecting that the data should be consistent across the cluster.
Here is the infinispan config:
{code:xml}
<infinispan
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:infinispan:config:5.0
http://www.infinispan.org/schemas/infinispan-config-5.0.xsd"
xmlns="urn:infinispan:config:5.0">
<global>
<transport clusterName="demoCluster"
machineId="node1"
rackId="r1" nodeName="dexlaptop"
<properties>
<property name="configurationFile"
value="./jgroups-tcp.xml" />
</properties>
</transport>
<globalJmxStatistics enabled="true"/>
</global>
<default>
<locking
isolationLevel="READ_COMMITTED"
lockAcquisitionTimeout="20000"
writeSkewCheck="false"
concurrencyLevel="5000"
useLockStriping="false"
/>
<jmxStatistics enabled="true"/>
<clustering mode="replication">
<stateRetrieval
timeout="240000"
fetchInMemoryState="true"
alwaysProvideInMemoryState="false"
/>
<!--
Network calls are synchronous.
-->
<sync replTimeout="20000"/>
</clustering>
<loaders
passivation="false"
shared="false"
preload="true">
<loader
class="org.infinispan.loaders.jdbc.stringbased.JdbcStringBasedCacheStore"
fetchPersistentState="true"
purgeOnStartup="false">
<!-- set to true for not first node in the cluster in testing/demo -->
<properties>
<property name="stringsTableNamePrefix"
value="ISPN_STRING_TABLE"/>
<property name="idColumnName" value="ID_COLUMN"/>
<property name="dataColumnName"
value="DATA_COLUMN"/>
<property name="timestampColumnName"
value="TIMESTAMP_COLUMN"/>
<property name="timestampColumnType"
value="BIGINT"/>
<property name="connectionFactoryClass"
value="org.infinispan.loaders.jdbc.connectionfactory.PooledConnectionFactory"/>
<property name="connectionUrl"
value="jdbc:h2:file:/var/tmp/h2cachestore;DB_CLOSE_DELAY=-1"/>
<property name="userName" value="sa"/>
<property name="driverClass"
value="org.h2.Driver"/>
<property name="idColumnType"
value="VARCHAR(255)"/>
<property name="dataColumnType" value="BINARY"/>
<property name="dropTableOnExit" value="false"/>
<property name="createTableOnStart"
value="true"/>
</properties>
<!--
<async enabled="false" />
-->
</loader>
</loaders>
</default>
</infinispan>
{code}
Basically, current ISPN implementation in state transfer will result in data insistence
among nodes in replication mode and each node has local cache store.
I found code BaseStateTransferManagerImpl's applyState code does not remove stale data
in the local cache store and result in inconsistent data when joins a cluster:
Here is the code snipt of applyState():
{code:java}
public void applyState(Collection<InternalCacheEntry> state,
Address sender, int viewId) throws InterruptedException {
.....
for (InternalCacheEntry e : state) {
InvocationContext ctx = icc.createInvocationContext(false, 1);
// locking not necessary as during rehashing we block all transactions
ctx.setFlags(CACHE_MODE_LOCAL, SKIP_CACHE_LOAD, SKIP_REMOTE_LOOKUP,
SKIP_SHARED_CACHE_STORE, SKIP_LOCKING,
SKIP_OWNERSHIP_CHECK);
try {
PutKeyValueCommand put = cf.buildPutKeyValueCommand(e.getKey(), e.getValue(),
e.getLifespan(), e.getMaxIdle(), ctx.getFlags());
interceptorChain.invoke(ctx, put);
} catch (Exception ee) {
log.problemApplyingStateForKey(ee.getMessage(), e.getKey());
}
}
...
}
{code}
As we can see that the code bascically try to add all data entryies got from the cluster
(other node). Hence, it does not know any previous entries were deleted from the cluster
which exist in its local cache store. This is exactly my test case (my confiuration is
that each node has its own cache store and in replication mode).
To fix this, we need to delete any entries from the local cache/cache store which no
longer exist in the new state.
I modified the above method by adding the following code before put loop, and it fixed the
problem in my configuration:
{code:java}
//Remove entries which no loger exist in the new state from local cache/cache store
for (InternalCacheEntry ie: dataContainer.entrySet()) {
if (!state.contains(ie)) {
log.debug("Try to delete local store entry no loger exists in the new
state: " + ie.getKey());
InvocationContext ctx = icc.createInvocationContext(false, 1);
// locking not necessary as during rehashing we block all transactions
ctx.setFlags(CACHE_MODE_LOCAL, SKIP_CACHE_LOAD, SKIP_REMOTE_LOOKUP,
SKIP_SHARED_CACHE_STORE, SKIP_LOCKING,
SKIP_OWNERSHIP_CHECK);
try {
RemoveCommand remove = cf.buildRemoveCommand(ie.getKey(), ie.getValue(),
ctx.getFlags());
interceptorChain.invoke(ctx, remove);
dataContainer.remove(ie.getKey());
} catch (Exception ee) {
log.error("failed to delete local store entry", ee);
}
}
}
...
{code}
Obvious, the above "fix" is based on assumption/configure that dataContainer
will have all local entries, i.e., preload=true, no enviction replication.
The real fix, I think, we need delegate the syncState(state) to cache store impl, where we
can check the configurations and do the right thing.
For example, in the cache store impl, we can calculate the changes based on local data and
new state, and apply the changes there.
was:
I rerun my test (an embedded ISPN cluser) with ISPN 5.0.0. final and 5.1 Sanpshot code.
It is configured in "replication", using local cache store, and preload=true,
purgeOnStartup=false .. (see the whole config below).
I will get the inconsistent data among the nodes in the following scenario:
1) start 2 node cluster
2) after the cluster is formed, add some data to the cache
k1-->v1
k2-->v2
I will see the data replication working perfectly at this point.
3) bring node 2 down
4) delete entry k1-->v1 through node1
Note: At this point, on the local (persistent) cache store on the node2 have 2 entries.
5) start node2, and wait to join the cluster
6) after state merging, you will see now that node1 has 1 entry and nod2 has 2 entries.
I am expecting that the data should be consistent across the cluster.
Here is the infinispan config:
<infinispan
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="urn:infinispan:config:5.0
http://www.infinispan.org/schemas/infinispan-config-5.0.xsd"
xmlns="urn:infinispan:config:5.0">
<global>
<transport clusterName="demoCluster"
machineId="node1"
rackId="r1" nodeName="dexlaptop"
<properties>
<property name="configurationFile"
value="./jgroups-tcp.xml" />
</properties>
</transport>
<globalJmxStatistics enabled="true"/>
</global>
<default>
<locking
isolationLevel="READ_COMMITTED"
lockAcquisitionTimeout="20000"
writeSkewCheck="false"
concurrencyLevel="5000"
useLockStriping="false"
/>
<jmxStatistics enabled="true"/>
<clustering mode="replication">
<stateRetrieval
timeout="240000"
fetchInMemoryState="true"
alwaysProvideInMemoryState="false"
/>
<!--
Network calls are synchronous.
-->
<sync replTimeout="20000"/>
</clustering>
<loaders
passivation="false"
shared="false"
preload="true">
<loader
class="org.infinispan.loaders.jdbc.stringbased.JdbcStringBasedCacheStore"
fetchPersistentState="true"
purgeOnStartup="false">
<!-- set to true for not first node in the cluster in testing/demo -->
<properties>
<property name="stringsTableNamePrefix"
value="ISPN_STRING_TABLE"/>
<property name="idColumnName" value="ID_COLUMN"/>
<property name="dataColumnName"
value="DATA_COLUMN"/>
<property name="timestampColumnName"
value="TIMESTAMP_COLUMN"/>
<property name="timestampColumnType"
value="BIGINT"/>
<property name="connectionFactoryClass"
value="org.infinispan.loaders.jdbc.connectionfactory.PooledConnectionFactory"/>
<property name="connectionUrl"
value="jdbc:h2:file:/var/tmp/h2cachestore;DB_CLOSE_DELAY=-1"/>
<property name="userName" value="sa"/>
<property name="driverClass"
value="org.h2.Driver"/>
<property name="idColumnType"
value="VARCHAR(255)"/>
<property name="dataColumnType" value="BINARY"/>
<property name="dropTableOnExit" value="false"/>
<property name="createTableOnStart"
value="true"/>
</properties>
<!--
<async enabled="false" />
-->
</loader>
</loaders>
</default>
</infinispan>
Basically, current ISPN implementation in state transfer will result in data insistence
among nodes in replication mode and each node has local cache store.
I found code BaseStateTransferManagerImpl's applyState code does not remove stale data
in the local cache store and result in inconsistent data when joins a cluster:
Here is the code snipt of applyState():
public void applyState(Collection<InternalCacheEntry> state,
Address sender, int viewId) throws InterruptedException {
.....
for (InternalCacheEntry e : state) {
InvocationContext ctx = icc.createInvocationContext(false, 1);
// locking not necessary as during rehashing we block all transactions
ctx.setFlags(CACHE_MODE_LOCAL, SKIP_CACHE_LOAD, SKIP_REMOTE_LOOKUP,
SKIP_SHARED_CACHE_STORE, SKIP_LOCKING,
SKIP_OWNERSHIP_CHECK);
try {
PutKeyValueCommand put = cf.buildPutKeyValueCommand(e.getKey(), e.getValue(),
e.getLifespan(), e.getMaxIdle(), ctx.getFlags());
interceptorChain.invoke(ctx, put);
} catch (Exception ee) {
log.problemApplyingStateForKey(ee.getMessage(), e.getKey());
}
}
...
}
As we can see that the code bascically try to add all data entryies got from the cluster
(other node). Hence, it does not know any previous entries were deleted from the cluster
which exist in its local cache store. This is exactly my test case (my confiuration is
that each node has its own cache store and in replication mode).
To fix this, we need to delete any entries from the local cache/cache store which no
longer exist in the new state.
I modified the above method by adding the following code before put loop, and it fixed the
problem in my configuration:
//Remove entries which no loger exist in the new state from local cache/cache store
for (InternalCacheEntry ie: dataContainer.entrySet()) {
if (!state.contains(ie)) {
log.debug("Try to delete local store entry no loger exists in the new
state: " + ie.getKey());
InvocationContext ctx = icc.createInvocationContext(false, 1);
// locking not necessary as during rehashing we block all transactions
ctx.setFlags(CACHE_MODE_LOCAL, SKIP_CACHE_LOAD, SKIP_REMOTE_LOOKUP,
SKIP_SHARED_CACHE_STORE, SKIP_LOCKING,
SKIP_OWNERSHIP_CHECK);
try {
RemoveCommand remove = cf.buildRemoveCommand(ie.getKey(), ie.getValue(),
ctx.getFlags());
interceptorChain.invoke(ctx, remove);
dataContainer.remove(ie.getKey());
} catch (Exception ee) {
log.error("failed to delete local store entry", ee);
}
}
}
...
Obvious, the above "fix" is based on assumption/configure that dataContainer
will have all local entries, i.e., preload=true, no enviction replication.
The real fix, I think, we need delegate the syncState(state) to cache store impl, where we
can check the configurations and do the right thing.
For example, in the cache store impl, we can calculate the changes based on local data and
new state, and apply the changes there.
Forum Reference:
http://community.jboss.org/message/639159#639159,
http://community.jboss.org/message/638770#638770 (was:
http://community.jboss.org/message/639159#639159,
http://community.jboss.org/message/638770#638770)
> inconsistent cache data in replication cluster with local (not shared) cache store
> ----------------------------------------------------------------------------------
>
> Key: ISPN-1586
> URL:
https://issues.jboss.org/browse/ISPN-1586
> Project: Infinispan
> Issue Type: Bug
> Components: Core API
> Affects Versions: 5.0.0.FINAL, 5.1.0.CR1
> Environment: ISPN 5.0.0. Final and ISPN 5.1 sanpshot
> Java 1.7
> Linux Cent OS
> Reporter: dex chen
> Assignee: Dan Berindei
> Fix For: 5.1.0.CR2, 5.1.0.FINAL
>
>
> I rerun my test (an embedded ISPN cluser) with ISPN 5.0.0. final and 5.1 Sanpshot
code.
> It is configured in "replication", using local cache store, and
preload=true, purgeOnStartup=false .. (see the whole config below).
> I will get the inconsistent data among the nodes in the following scenario:
> 1) start 2 node cluster
> 2) after the cluster is formed, add some data to the cache
> k1-->v1
> k2-->v2
> I will see the data replication working perfectly at this point.
> 3) bring node 2 down
> 4) delete entry k1-->v1 through node1
> Note: At this point, on the local (persistent) cache store on the node2 have 2
entries.
> 5) start node2, and wait to join the cluster
> 6) after state merging, you will see now that node1 has 1 entry and nod2 has 2
entries.
> I am expecting that the data should be consistent across the cluster.
> Here is the infinispan config:
> {code:xml}
> <infinispan
>
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
> xsi:schemaLocation="urn:infinispan:config:5.0
http://www.infinispan.org/schemas/infinispan-config-5.0.xsd"
> xmlns="urn:infinispan:config:5.0">
> <global>
> <transport clusterName="demoCluster"
> machineId="node1"
> rackId="r1" nodeName="dexlaptop"
>
> <properties>
> <property name="configurationFile"
value="./jgroups-tcp.xml" />
> </properties>
> </transport>
> <globalJmxStatistics enabled="true"/>
> </global>
> <default>
> <locking
> isolationLevel="READ_COMMITTED"
> lockAcquisitionTimeout="20000"
> writeSkewCheck="false"
> concurrencyLevel="5000"
> useLockStriping="false"
> />
> <jmxStatistics enabled="true"/>
> <clustering mode="replication">
> <stateRetrieval
> timeout="240000"
> fetchInMemoryState="true"
> alwaysProvideInMemoryState="false"
> />
> <!--
> Network calls are synchronous.
> -->
> <sync replTimeout="20000"/>
> </clustering>
> <loaders
> passivation="false"
> shared="false"
> preload="true">
> <loader
>
class="org.infinispan.loaders.jdbc.stringbased.JdbcStringBasedCacheStore"
> fetchPersistentState="true"
> purgeOnStartup="false">
> <!-- set to true for not first node in the cluster in testing/demo
-->
> <properties>
> <property name="stringsTableNamePrefix"
value="ISPN_STRING_TABLE"/>
> <property name="idColumnName"
value="ID_COLUMN"/>
> <property name="dataColumnName"
value="DATA_COLUMN"/>
> <property name="timestampColumnName"
value="TIMESTAMP_COLUMN"/>
> <property name="timestampColumnType"
value="BIGINT"/>
> <property name="connectionFactoryClass"
value="org.infinispan.loaders.jdbc.connectionfactory.PooledConnectionFactory"/>
> <property name="connectionUrl"
value="jdbc:h2:file:/var/tmp/h2cachestore;DB_CLOSE_DELAY=-1"/>
> <property name="userName" value="sa"/>
> <property name="driverClass"
value="org.h2.Driver"/>
> <property name="idColumnType"
value="VARCHAR(255)"/>
> <property name="dataColumnType"
value="BINARY"/>
> <property name="dropTableOnExit"
value="false"/>
> <property name="createTableOnStart"
value="true"/>
> </properties>
> <!--
> <async enabled="false" />
> -->
> </loader>
> </loaders>
> </default>
> </infinispan>
> {code}
> Basically, current ISPN implementation in state transfer will result in data
insistence among nodes in replication mode and each node has local cache store.
> I found code BaseStateTransferManagerImpl's applyState code does not remove stale
data in the local cache store and result in inconsistent data when joins a cluster:
> Here is the code snipt of applyState():
> {code:java}
> public void applyState(Collection<InternalCacheEntry> state,
> Address sender, int viewId) throws InterruptedException {
> .....
>
> for (InternalCacheEntry e : state) {
> InvocationContext ctx = icc.createInvocationContext(false, 1);
> // locking not necessary as during rehashing we block all transactions
> ctx.setFlags(CACHE_MODE_LOCAL, SKIP_CACHE_LOAD, SKIP_REMOTE_LOOKUP,
SKIP_SHARED_CACHE_STORE, SKIP_LOCKING,
> SKIP_OWNERSHIP_CHECK);
> try {
> PutKeyValueCommand put = cf.buildPutKeyValueCommand(e.getKey(),
e.getValue(), e.getLifespan(), e.getMaxIdle(), ctx.getFlags());
> interceptorChain.invoke(ctx, put);
> } catch (Exception ee) {
> log.problemApplyingStateForKey(ee.getMessage(), e.getKey());
> }
> }
>
> ...
> }
> {code}
> As we can see that the code bascically try to add all data entryies got from the
cluster (other node). Hence, it does not know any previous entries were deleted from the
cluster which exist in its local cache store. This is exactly my test case (my
confiuration is that each node has its own cache store and in replication mode).
> To fix this, we need to delete any entries from the local cache/cache store which no
longer exist in the new state.
> I modified the above method by adding the following code before put loop, and it
fixed the problem in my configuration:
> {code:java}
> //Remove entries which no loger exist in the new state from local cache/cache store
> for (InternalCacheEntry ie: dataContainer.entrySet()) {
>
> if (!state.contains(ie)) {
> log.debug("Try to delete local store entry no loger exists in the new
state: " + ie.getKey());
> InvocationContext ctx = icc.createInvocationContext(false, 1);
> // locking not necessary as during rehashing we block all transactions
> ctx.setFlags(CACHE_MODE_LOCAL, SKIP_CACHE_LOAD, SKIP_REMOTE_LOOKUP,
SKIP_SHARED_CACHE_STORE, SKIP_LOCKING,
> SKIP_OWNERSHIP_CHECK);
> try {
> RemoveCommand remove = cf.buildRemoveCommand(ie.getKey(),
ie.getValue(), ctx.getFlags());
> interceptorChain.invoke(ctx, remove);
> dataContainer.remove(ie.getKey());
> } catch (Exception ee) {
> log.error("failed to delete local store entry", ee);
> }
> }
> }
> ...
> {code}
> Obvious, the above "fix" is based on assumption/configure that
dataContainer will have all local entries, i.e., preload=true, no enviction replication.
> The real fix, I think, we need delegate the syncState(state) to cache store impl,
where we can check the configurations and do the right thing.
> For example, in the cache store impl, we can calculate the changes based on local
data and new state, and apply the changes there.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators:
https://issues.jboss.org/secure/ContactAdministrators!default.jspa
For more information on JIRA, see:
http://www.atlassian.com/software/jira