[infinispan-dev] [ISPN-116] Async cache store: aggregation of multiple changes on a single key
Amin Abbaspour
a.abbaspour at gmail.com
Sun Jul 26 08:20:08 EDT 2009
Hi Galder,
I checked out latest version (from subversion trunk) and used it for my test.
Here is config in xml:
---
<?xml version="1.0" encoding="UTF-8"?>
<infinispan xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="urn:infinispan:config:4.0">
<global>
<transport clusterName="SmppGatewayCluster"/>
</global>
<namedCache name="accounts">
<eviction wakeUpInterval="5000" maxEntries="100" strategy="FIFO"/>
<expiration lifespan="600" maxIdle="1000"/>
<loaders passivation="false" shared="false" preload="true">
<loader
class="com.magfa.gateway.dao.infinispan.jdbcstore.AccountJdbcCacheStore"
fetchPersistentState="true"
ignoreModifications="false" purgeOnStartup="true">
<properties/>
<singletonStore enabled="true"
pushStateWhenCoordinator="true" pushStateTimeout="20000"/>
<async enabled="true" threadPoolSize="5"/>
</loader>
</loaders>
</namedCache>
</infinispan>
---
And here is my similar config code:
---
Configuration config = new Configuration();
GlobalConfiguration globalConfiguration =
GlobalConfiguration.getNonClusteredDefault();
AsyncStoreConfig asyncStoreConfig = new AsyncStoreConfig();
asyncStoreConfig.setEnabled(true);
CacheStoreConfig cacheStoreConfig = new AccountJdbcCacheStoreConfig();
cacheStoreConfig.setAsyncStoreConfig(asyncStoreConfig);
cacheStoreConfig.setPurgeSynchronously(false);
CacheLoaderManagerConfig clmc = new CacheLoaderManagerConfig();
List<CacheLoaderConfig> cacheStoreConfigs = new
ArrayList<CacheLoaderConfig>(1);
cacheStoreConfigs.add(cacheStoreConfig);
clmc.setCacheLoaderConfigs(cacheStoreConfigs);
config.setCacheLoaderManagerConfig(clmc);
config.setEvictionStrategy(EvictionStrategy.FIFO);
config.setExpirationLifespan(10000);
config.setExpirationMaxIdle(10000);
config.setEvictionMaxEntries(100);
config.setEvictionWakeUpInterval(60 * 1000);
config.setCacheMode(Configuration.CacheMode.INVALIDATION_ASYNC);
// todo: important
//CacheStoreConfig
CacheManager manager = new
DefaultCacheManager(globalConfiguration, config);
---
and here is my custom store:
---
public class AccountJdbcCacheStore extends LockSupportCacheStore {
private static final Logger log =
Logger.getLogger(AccountJdbcCacheStore.class);
private final ConnectionPool pool;
// todo; why constructor is called twice?
public AccountJdbcCacheStore() {
Properties properties = new Properties();
properties.setProperty("driver", "com.mysql.jdbc.Driver");
properties.setProperty("url",
"jdbc:mysql://dbsrv:3306/accounts?autoReconnect=true");
properties.setProperty("user", "*****");
properties.setProperty("password", "*****");
pool = new ProxoolConnectionPool(properties);
}
protected void clearLockSafe() throws CacheLoaderException {
throw new CacheLoaderException("Will not clear AccountBillingTable");
}
private static final String LOAD_ACCOUNTS_QUERY = "SELECT * FROM
accountsBilling";
protected Set<InternalCacheEntry> loadAllLockSafe() throws
CacheLoaderException {
log.trace("loadAllLockSafe()");
Connection connection = null;
Statement st = null;
ResultSet rs = null;
try {
connection = pool.getConnection();
st = connection.createStatement();
rs = st.executeQuery(LOAD_ACCOUNTS_QUERY); // todo: where enable=1
final Set<InternalCacheEntry> results = new
HashSet<InternalCacheEntry>();
while (rs.next()) {
results.add(InternalEntryFactory.create(rs.getLong("id"),
EngineCompatibleBillingBackend.fetchFromResultSet(rs)));
}
return results;
} catch (SQLException e) {
throw new RuntimeException("Exception in loading accounts
from good old accountsBilling ", e);
} finally {
pool.attemptClose(rs);
pool.attemptClose(st);
pool.attemptClose(connection);
}
}
protected void toStreamLockSafe(ObjectOutput oos) throws
CacheLoaderException {
throw new CacheLoaderException("toStreamLockSafe(); What should
i do with ObjectOutput: " + oos);
}
protected void fromStreamLockSafe(ObjectInput ois) throws
CacheLoaderException {
throw new CacheLoaderException("fromStreamLockSafe(); What
should i do with ObjectInput: " + ois);
}
protected boolean removeLockSafe(Object key, String lockingKey)
throws CacheLoaderException {
log.trace("removeLockSafe(); will not remove anything from
AccountsTable with key: " + key + ", lockingKey: " + lockingKey);
return true;
}
private final static double OUTGOING_FARSI_COST_BASE = 71.2d;
private final static double OUTGOING_ENGLISH_COST_BASE = 177.6d;
private final static String UPDATE_ACCOUNT_QUERY;
static {
final String TOTAL_MESSAGE_COST_FORMULA = "(?*(farsiCost+" +
OUTGOING_FARSI_COST_BASE + ") + (?*(englishCost+"
+ OUTGOING_ENGLISH_COST_BASE + "))+(?*incomingCost))";
UPDATE_ACCOUNT_QUERY = "UPDATE accountsBilling SET
totalSentMessages = totalSentMessages + ?, " +
"totalReceivedMessages = totalReceivedMessages + ?,
farsiSentMessages = farsiSentMessages + ? , " +
"englishSentMessages = englishSentMessages + ?, " +
"usedCredit2 = usedCredit2 + " +
TOTAL_MESSAGE_COST_FORMULA + ',' +
"credit2 = credit2 - " + TOTAL_MESSAGE_COST_FORMULA + ' ' +
"WHERE id=?";
}
private static final AtomicInteger storeCounter = new AtomicInteger();
protected void storeLockSafe(InternalCacheEntry ed, String
lockingKey) throws CacheLoaderException {
final Account account = (Account) ed.getValue();
log.info("-------> storeLockSafe(" +
storeCounter.incrementAndGet() + "); storing account: " + account + ",
lockingKey: " + lockingKey);
final long sent = account.getSentAndReset();
final long received = account.getReceivedAndReset();
final long dlr = account.getDlrAndReset();
final long farsiSent = account.getFarsiSentAndReset();
final long englishSent = account.getEnglishSentAndReset();
Connection connection = null;
PreparedStatement st = null;
try {
connection = pool.getConnection();
st = connection.prepareStatement(UPDATE_ACCOUNT_QUERY);
st.setLong(1, sent);
st.setLong(2, received);
st.setLong(3, farsiSent);
st.setLong(4, englishSent);
st.setLong(5, farsiSent);
st.setLong(6, englishSent);
st.setLong(7, received);
st.setLong(8, farsiSent);
st.setLong(9, englishSent);
st.setLong(10, received);
st.setLong(11, account.getId());
st.executeUpdate();
} catch (SQLException e) {
throw new RuntimeException("Exception in updating
accountsBilling from engine DB", e);
} finally {
pool.attemptClose(st);
pool.attemptClose(connection);
}
}
protected InternalCacheEntry loadLockSafe(Object key, String
lockingKey) throws CacheLoaderException {
log.info("loadLockSafe(), key: " + key + ", lockingKey: " + lockingKey);
Connection conn = null;
Statement st = null;
ResultSet rs = null;
try {
conn = pool.getConnection();
st = conn.createStatement();
rs = st.executeQuery("SELECT * FROM accountsBilling WHERE
id=" + lockingKey);
if (rs.next()) {
return InternalEntryFactory.create(key,
EngineCompatibleBillingBackend.fetchFromResultSet(rs));
}
return null;
} catch (SQLException e) {
final String message = "SQL error while fetching strored
entry with key:" + key + " lockingKey: " + lockingKey;
log.error(message, e);
throw new CacheLoaderException(message, e);
} finally {
pool.attemptClose(rs);
pool.attemptClose(st);
pool.attemptClose(conn);
}
}
protected String getLockFromKey(Object key) throws CacheLoaderException {
return key.toString();
}
public Class<? extends CacheLoaderConfig> getConfigurationClass() {
return AccountJdbcCacheStoreConfig.class;
}
}
----
and custom config:
---
public class AccountJdbcCacheStoreConfig extends LockSupportCacheStoreConfig {
private static final Logger log =
Logger.getLogger(AccountJdbcCacheStoreConfig.class);
public LockSupportCacheStoreConfig clone() {
return new AccountJdbcCacheStoreConfig();
}
public String getCacheLoaderClassName() {
return AccountJdbcCacheStore.class.getName();
}
public void setCacheLoaderClassName(String s) {
}
}
---
but I still have two issues:
1. Using config via code, I get the flush to db working but still no
aggregation. For 100 even I have 100 stores.
2. Using xml config, no storeLockSafe is called at all?
Thanks for your help,
Amin Abbaspour
On Mon, Jul 20, 2009 at 7:09 PM, Galder
Zamarreno<galder.zamarreno at redhat.com> wrote:
>
>
> On 07/20/2009 02:39 PM, Manik Surtani wrote:
>>
>> On 20 Jul 2009, at 12:35, Amin Abbaspour wrote:
>>
>>> Hi,
>>>
>>> Thanks Galder. I will test CoalescedAsyncStore ASAP and let you know.
>>>
>>> BTW Manik, I think the change is so much that we'd better see this as
>>> a completely new store and later replace AsyncCacheStore or keep them
>>> both.
>>
>> I agree that the impl is significantly different, but from a
>> functional/feature standpoint, what does it offer over and above the
>> AsyncCacheStore, except that it just does the same thing "better" ? :-)
>
> I agree with Manik. I would have only left the other implementation if there
> was anything in particular the past implementation offered versus the
> coalesced one. My perf tests indicated that we're as fast as before but with
> the added bonus of lesser cache store calls, so that looks to me a win/win
> situation.
>
> I've now committed https://jira.jboss.org/jira/browse/ISPN-116 and added an
> FAQ entry to http://www.jboss.org/community/wiki/InfinispanTechnicalFAQs
>
> I'll talk about this further on a blog entry later this week.
>
> FAO Amin, if you wanna try it out, checkout latest from trunk and let us
> know what you think. No need for configuration changes, simply add <async
> enabled="true" /> and that should be enough.
>
>>
>>
>>> Regards,
>>> Amin Abbaspour
>>>
>>> On Mon, Jul 20, 2009 at 2:47 PM, Manik Surtani<manik at jboss.org> wrote:
>>>>
>>>> Nope, never got the email below.
>>>>
>>>> Anyway, I always thought the intention was to enhance or improve the
>>>> AsyncCacheStore, not to create another one.
>>>>
>>>> Cheers
>>>> Manik
>>>>
>>>> On 20 Jul 2009, at 10:55, Galder Zamarreno wrote:
>>>>
>>>>> Did other list readers ever got this message?
>>>>>
>>>>> I did send it last week but never actually got it via the dev
>>>>> list, so
>>>>> double checking...
>>>>>
>>>>> On 07/15/2009 07:58 PM, Galder Zamarreno wrote:
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> As part of this JIRA, I'm thinking whether we wanna leave the
>>>>>> current
>>>>>> queue based async store.
>>>>>>
>>>>>> For the work I've done, I've created a new class called
>>>>>> CoalescedAsyncStore that works in the way agreed below, iow,
>>>>>> instead of
>>>>>> queying mods, it keeps a ConcurrentMap and which is swapped by a
>>>>>> new
>>>>>> instance when the async thread is going to apply the changes.
>>>>>>
>>>>>> I've run some perf tests between CoalescedAsyncStore and AsyncStore
>>>>>> and
>>>>>> when doing 1 million store where each key is different, the
>>>>>> performance
>>>>>> is fairly similar. The difference comes when trying to run a
>>>>>> similar
>>>>>> test where the same key is always updated, this results in lesser
>>>>>> underlying store calls and hence CoalescedAsyncStore solution is
>>>>>> faster
>>>>>> here.
>>>>>>
>>>>>> So, rather than keeping both, I'd suggest replacing AsyncStore with
>>>>>> the
>>>>>> impl in CoalescedAsyncStore. We also need to look at the
>>>>>> configuration
>>>>>> for the new AsyncStore:
>>>>>>
>>>>>> Configuration wise in AsyncStoreConfig, I'd leave threadPoolSize,
>>>>>> enabled and add the read/write lock acquisition timeout. The rest
>>>>>> I'd
>>>>>> get rid as no longer apply:
>>>>>>
>>>>>> int batchSize = 100;
>>>>>> long pollWait = 100;
>>>>>> int queueSize = 10000;
>>>>>>
>>>>>> By default, I'd give 5000ms to the read/write lock acquisition
>>>>>> timeout.
>>>>>>
>>>>>> Thoughts?
>>>>>>
>>>>>> On 07/09/2009 01:08 PM, Manik Surtani wrote:
>>>>>>>
>>>>>>> On 9 Jul 2009, at 11:37, Galder Zamarreno wrote:
>>>>>>>
>>>>>>>>
>>>>>>>> On 07/09/2009 11:55 AM, Manik Surtani wrote:
>>>>>>>>>
>>>>>>>>> On 8 Jul 2009, at 19:53, Jason T. Greene wrote:
>>>>>>>>>
>>>>>>>>>> Manik Surtani wrote:
>>>>>>>>>>>
>>>>>>>>>>> * Make the concurrent map volatile
>>>>>>>>>>> * When iterating, first create a new ConcurrentMap and replace
>>>>>>>>>>> the
>>>>>>>>>>> old one with the new one so all concurrent threads write to
>>>>>>>>>>> the new
>>>>>>>>>>> Map
>>>>>>>>>>> * Iterate over the old map
>>>>>>>>>>
>>>>>>>>>> That would lead to race conditions since a concurrent writing
>>>>>>>>>> thread
>>>>>>>>>> could write to the "old" map, either by getting a recent yet
>>>>>>>>>> incorrect
>>>>>>>>>> read off the volatile, or reading it right before it changes.
>>>>>>>>>
>>>>>>>>> True, since referencing the map and writing to it isn't atomic.
>>>>>>>>>
>>>>>>>>> We could guard access to the map with a read/write lock. Safe,
>>>>>>>>> if a
>>>>>>>>> little heavy-handed... map writers would acquire a RL (since we
>>>>>>>>> want
>>>>>>>>> concurrent access here) but the async flushing thread would need
>>>>>>>>> to
>>>>>>>>> wait
>>>>>>>>> for a WL to swap the map reference, releasing the lock after the
>>>>>>>>> map
>>>>>>>>> reference has been swapped.
>>>>>>>>
>>>>>>>> Yeah, I don't think it could be volatile because it only
>>>>>>>> applies to
>>>>>>>> the variable itself, not the contents pointed at. The R/W lock
>>>>>>>> approach looks like a valid one and better than using
>>>>>>>> synchronized
>>>>>>>> blocks. Another thing I had in my mind is whether we need the
>>>>>>>> copy to
>>>>>>>> be a ConcurrentMap, we could probably just use a
>>>>>>>> Immutables.immutableMapCopy().
>>>>>>>
>>>>>>> Sync blocks would suck since it is effectively an exclusive
>>>>>>> lock. We
>>>>>>> want non-exclusive (read) locks for threads writing to this map
>>>>>>> - we
>>>>>>> want multiple application threads to do this concurrently
>>>>>>> otherwise this
>>>>>>> becomes a bottleneck. And hence the need for a ConcurrentMap.
>>>>>>>
>>>>>>> And we don't make a copy at all - all the async flushing thread
>>>>>>> does is
>>>>>>> that it creates a new, empty ConcurrentMap and swaps the 'async
>>>>>>> queue'
>>>>>>> Map so that application threads can continue logging their changes
>>>>>>> somewhere while old changes can be iterated through and flushed.
>>>>>>> And for
>>>>>>> this, the field still needs to be volatile (not strictly so, since
>>>>>>> we
>>>>>>> can rely on the fencing that will happen in the lock as a side-
>>>>>>> effect)
>>>>>>>
>>>>>>>> I looked at the two implementations (ConcurrentHashMap(Map)
>>>>>>>> constructor vs Immutables.immutableMapCopy() which would
>>>>>>>> Object.clone() on the map) but I'm not sure which one would be
>>>>>>>> faster.
>>>>>>>> Since clone() would rely on a native clone() impl, I'd imagine
>>>>>>>> that to
>>>>>>>> be faster.
>>>>>>>
>>>>>>> Like I said, we don't need a copy. We need a new, empty map for
>>>>>>> threads
>>>>>>> to log changes to while the old one's being flushed.
>>>>>>>
>>>>>>>>> --
>>>>>>>>> Manik Surtani
>>>>>>>>> manik at jboss.org
>>>>>>>>> Lead, Infinispan
>>>>>>>>> Lead, JBoss Cache
>>>>>>>>> http://www.infinispan.org
>>>>>>>>> http://www.jbosscache.org
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> _______________________________________________
>>>>>>>>> infinispan-dev mailing list
>>>>>>>>> infinispan-dev at lists.jboss.org
>>>>>>>>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>>>>>>>>
>>>>>>>> --
>>>>>>>> Galder Zamarreño
>>>>>>>> Sr. Software Engineer
>>>>>>>> Infinispan, JBoss Cache
>>>>>>>
>>>>>>> --
>>>>>>> Manik Surtani
>>>>>>> manik at jboss.org
>>>>>>> Lead, Infinispan
>>>>>>> Lead, JBoss Cache
>>>>>>> http://www.infinispan.org
>>>>>>> http://www.jbosscache.org
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>> --
>>>>> Galder Zamarreño
>>>>> Sr. Software Engineer
>>>>> Infinispan, JBoss Cache
>>>>> _______________________________________________
>>>>> infinispan-dev mailing list
>>>>> infinispan-dev at lists.jboss.org
>>>>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>>>>
>>>> --
>>>> Manik Surtani
>>>> manik at jboss.org
>>>> Lead, Infinispan
>>>> Lead, JBoss Cache
>>>> http://www.infinispan.org
>>>> http://www.jbosscache.org
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> _______________________________________________
>>>> infinispan-dev mailing list
>>>> infinispan-dev at lists.jboss.org
>>>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>>>>
>>> _______________________________________________
>>> infinispan-dev mailing list
>>> infinispan-dev at lists.jboss.org
>>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>>
>> --
>> Manik Surtani
>> manik at jboss.org
>> Lead, Infinispan
>> Lead, JBoss Cache
>> http://www.infinispan.org
>> http://www.jbosscache.org
>>
>>
>>
>>
>>
>> _______________________________________________
>> infinispan-dev mailing list
>> infinispan-dev at lists.jboss.org
>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>
> --
> Galder Zamarreño
> Sr. Software Engineer
> Infinispan, JBoss Cache
>
More information about the infinispan-dev
mailing list