Hi Galder,
I checked out latest version (from subversion trunk) and used it for my test.
Here is config in xml:
---
<?xml version="1.0" encoding="UTF-8"?>
<infinispan
xmlns="urn:infinispan:config:4.0">
<global>
<transport clusterName="SmppGatewayCluster"/>
</global>
<namedCache name="accounts">
<eviction wakeUpInterval="5000" maxEntries="100"
strategy="FIFO"/>
<expiration lifespan="600" maxIdle="1000"/>
<loaders passivation="false" shared="false"
preload="true">
<loader
class="com.magfa.gateway.dao.infinispan.jdbcstore.AccountJdbcCacheStore"
fetchPersistentState="true"
ignoreModifications="false"
purgeOnStartup="true">
<properties/>
<singletonStore enabled="true"
pushStateWhenCoordinator="true" pushStateTimeout="20000"/>
<async enabled="true" threadPoolSize="5"/>
</loader>
</loaders>
</namedCache>
</infinispan>
---
And here is my similar config code:
---
Configuration config = new Configuration();
GlobalConfiguration globalConfiguration =
GlobalConfiguration.getNonClusteredDefault();
AsyncStoreConfig asyncStoreConfig = new AsyncStoreConfig();
asyncStoreConfig.setEnabled(true);
CacheStoreConfig cacheStoreConfig = new AccountJdbcCacheStoreConfig();
cacheStoreConfig.setAsyncStoreConfig(asyncStoreConfig);
cacheStoreConfig.setPurgeSynchronously(false);
CacheLoaderManagerConfig clmc = new CacheLoaderManagerConfig();
List<CacheLoaderConfig> cacheStoreConfigs = new
ArrayList<CacheLoaderConfig>(1);
cacheStoreConfigs.add(cacheStoreConfig);
clmc.setCacheLoaderConfigs(cacheStoreConfigs);
config.setCacheLoaderManagerConfig(clmc);
config.setEvictionStrategy(EvictionStrategy.FIFO);
config.setExpirationLifespan(10000);
config.setExpirationMaxIdle(10000);
config.setEvictionMaxEntries(100);
config.setEvictionWakeUpInterval(60 * 1000);
config.setCacheMode(Configuration.CacheMode.INVALIDATION_ASYNC);
// todo: important
//CacheStoreConfig
CacheManager manager = new
DefaultCacheManager(globalConfiguration, config);
---
and here is my custom store:
---
public class AccountJdbcCacheStore extends LockSupportCacheStore {
private static final Logger log =
Logger.getLogger(AccountJdbcCacheStore.class);
private final ConnectionPool pool;
// todo; why constructor is called twice?
public AccountJdbcCacheStore() {
Properties properties = new Properties();
properties.setProperty("driver", "com.mysql.jdbc.Driver");
properties.setProperty("url",
"jdbc:mysql://dbsrv:3306/accounts?autoReconnect=true");
properties.setProperty("user", "*****");
properties.setProperty("password", "*****");
pool = new ProxoolConnectionPool(properties);
}
protected void clearLockSafe() throws CacheLoaderException {
throw new CacheLoaderException("Will not clear AccountBillingTable");
}
private static final String LOAD_ACCOUNTS_QUERY = "SELECT * FROM
accountsBilling";
protected Set<InternalCacheEntry> loadAllLockSafe() throws
CacheLoaderException {
log.trace("loadAllLockSafe()");
Connection connection = null;
Statement st = null;
ResultSet rs = null;
try {
connection = pool.getConnection();
st = connection.createStatement();
rs = st.executeQuery(LOAD_ACCOUNTS_QUERY); // todo: where enable=1
final Set<InternalCacheEntry> results = new
HashSet<InternalCacheEntry>();
while (rs.next()) {
results.add(InternalEntryFactory.create(rs.getLong("id"),
EngineCompatibleBillingBackend.fetchFromResultSet(rs)));
}
return results;
} catch (SQLException e) {
throw new RuntimeException("Exception in loading accounts
from good old accountsBilling ", e);
} finally {
pool.attemptClose(rs);
pool.attemptClose(st);
pool.attemptClose(connection);
}
}
protected void toStreamLockSafe(ObjectOutput oos) throws
CacheLoaderException {
throw new CacheLoaderException("toStreamLockSafe(); What should
i do with ObjectOutput: " + oos);
}
protected void fromStreamLockSafe(ObjectInput ois) throws
CacheLoaderException {
throw new CacheLoaderException("fromStreamLockSafe(); What
should i do with ObjectInput: " + ois);
}
protected boolean removeLockSafe(Object key, String lockingKey)
throws CacheLoaderException {
log.trace("removeLockSafe(); will not remove anything from
AccountsTable with key: " + key + ", lockingKey: " + lockingKey);
return true;
}
private final static double OUTGOING_FARSI_COST_BASE = 71.2d;
private final static double OUTGOING_ENGLISH_COST_BASE = 177.6d;
private final static String UPDATE_ACCOUNT_QUERY;
static {
final String TOTAL_MESSAGE_COST_FORMULA = "(?*(farsiCost+" +
OUTGOING_FARSI_COST_BASE + ") + (?*(englishCost+"
+ OUTGOING_ENGLISH_COST_BASE + "))+(?*incomingCost))";
UPDATE_ACCOUNT_QUERY = "UPDATE accountsBilling SET
totalSentMessages = totalSentMessages + ?, " +
"totalReceivedMessages = totalReceivedMessages + ?,
farsiSentMessages = farsiSentMessages + ? , " +
"englishSentMessages = englishSentMessages + ?, " +
"usedCredit2 = usedCredit2 + " +
TOTAL_MESSAGE_COST_FORMULA + ',' +
"credit2 = credit2 - " + TOTAL_MESSAGE_COST_FORMULA + ' '
+
"WHERE id=?";
}
private static final AtomicInteger storeCounter = new AtomicInteger();
protected void storeLockSafe(InternalCacheEntry ed, String
lockingKey) throws CacheLoaderException {
final Account account = (Account) ed.getValue();
log.info("-------> storeLockSafe(" +
storeCounter.incrementAndGet() + "); storing account: " + account + ",
lockingKey: " + lockingKey);
final long sent = account.getSentAndReset();
final long received = account.getReceivedAndReset();
final long dlr = account.getDlrAndReset();
final long farsiSent = account.getFarsiSentAndReset();
final long englishSent = account.getEnglishSentAndReset();
Connection connection = null;
PreparedStatement st = null;
try {
connection = pool.getConnection();
st = connection.prepareStatement(UPDATE_ACCOUNT_QUERY);
st.setLong(1, sent);
st.setLong(2, received);
st.setLong(3, farsiSent);
st.setLong(4, englishSent);
st.setLong(5, farsiSent);
st.setLong(6, englishSent);
st.setLong(7, received);
st.setLong(8, farsiSent);
st.setLong(9, englishSent);
st.setLong(10, received);
st.setLong(11, account.getId());
st.executeUpdate();
} catch (SQLException e) {
throw new RuntimeException("Exception in updating
accountsBilling from engine DB", e);
} finally {
pool.attemptClose(st);
pool.attemptClose(connection);
}
}
protected InternalCacheEntry loadLockSafe(Object key, String
lockingKey) throws CacheLoaderException {
log.info("loadLockSafe(), key: " + key + ", lockingKey: " +
lockingKey);
Connection conn = null;
Statement st = null;
ResultSet rs = null;
try {
conn = pool.getConnection();
st = conn.createStatement();
rs = st.executeQuery("SELECT * FROM accountsBilling WHERE
id=" + lockingKey);
if (rs.next()) {
return InternalEntryFactory.create(key,
EngineCompatibleBillingBackend.fetchFromResultSet(rs));
}
return null;
} catch (SQLException e) {
final String message = "SQL error while fetching strored
entry with key:" + key + " lockingKey: " + lockingKey;
log.error(message, e);
throw new CacheLoaderException(message, e);
} finally {
pool.attemptClose(rs);
pool.attemptClose(st);
pool.attemptClose(conn);
}
}
protected String getLockFromKey(Object key) throws CacheLoaderException {
return key.toString();
}
public Class<? extends CacheLoaderConfig> getConfigurationClass() {
return AccountJdbcCacheStoreConfig.class;
}
}
----
and custom config:
---
public class AccountJdbcCacheStoreConfig extends LockSupportCacheStoreConfig {
private static final Logger log =
Logger.getLogger(AccountJdbcCacheStoreConfig.class);
public LockSupportCacheStoreConfig clone() {
return new AccountJdbcCacheStoreConfig();
}
public String getCacheLoaderClassName() {
return AccountJdbcCacheStore.class.getName();
}
public void setCacheLoaderClassName(String s) {
}
}
---
but I still have two issues:
1. Using config via code, I get the flush to db working but still no
aggregation. For 100 even I have 100 stores.
2. Using xml config, no storeLockSafe is called at all?
Thanks for your help,
Amin Abbaspour
On Mon, Jul 20, 2009 at 7:09 PM, Galder
Zamarreno<galder.zamarreno(a)redhat.com> wrote:
On 07/20/2009 02:39 PM, Manik Surtani wrote:
>
> On 20 Jul 2009, at 12:35, Amin Abbaspour wrote:
>
>> Hi,
>>
>> Thanks Galder. I will test CoalescedAsyncStore ASAP and let you know.
>>
>> BTW Manik, I think the change is so much that we'd better see this as
>> a completely new store and later replace AsyncCacheStore or keep them
>> both.
>
> I agree that the impl is significantly different, but from a
> functional/feature standpoint, what does it offer over and above the
> AsyncCacheStore, except that it just does the same thing "better" ? :-)
I agree with Manik. I would have only left the other implementation if there
was anything in particular the past implementation offered versus the
coalesced one. My perf tests indicated that we're as fast as before but with
the added bonus of lesser cache store calls, so that looks to me a win/win
situation.
I've now committed
https://jira.jboss.org/jira/browse/ISPN-116 and added an
FAQ entry to
http://www.jboss.org/community/wiki/InfinispanTechnicalFAQs
I'll talk about this further on a blog entry later this week.
FAO Amin, if you wanna try it out, checkout latest from trunk and let us
know what you think. No need for configuration changes, simply add <async
enabled="true" /> and that should be enough.
>
>
>> Regards,
>> Amin Abbaspour
>>
>> On Mon, Jul 20, 2009 at 2:47 PM, Manik Surtani<manik(a)jboss.org> wrote:
>>>
>>> Nope, never got the email below.
>>>
>>> Anyway, I always thought the intention was to enhance or improve the
>>> AsyncCacheStore, not to create another one.
>>>
>>> Cheers
>>> Manik
>>>
>>> On 20 Jul 2009, at 10:55, Galder Zamarreno wrote:
>>>
>>>> Did other list readers ever got this message?
>>>>
>>>> I did send it last week but never actually got it via the dev
>>>> list, so
>>>> double checking...
>>>>
>>>> On 07/15/2009 07:58 PM, Galder Zamarreno wrote:
>>>>>
>>>>> Hi,
>>>>>
>>>>> As part of this JIRA, I'm thinking whether we wanna leave the
>>>>> current
>>>>> queue based async store.
>>>>>
>>>>> For the work I've done, I've created a new class called
>>>>> CoalescedAsyncStore that works in the way agreed below, iow,
>>>>> instead of
>>>>> queying mods, it keeps a ConcurrentMap and which is swapped by a
>>>>> new
>>>>> instance when the async thread is going to apply the changes.
>>>>>
>>>>> I've run some perf tests between CoalescedAsyncStore and
AsyncStore
>>>>> and
>>>>> when doing 1 million store where each key is different, the
>>>>> performance
>>>>> is fairly similar. The difference comes when trying to run a
>>>>> similar
>>>>> test where the same key is always updated, this results in lesser
>>>>> underlying store calls and hence CoalescedAsyncStore solution is
>>>>> faster
>>>>> here.
>>>>>
>>>>> So, rather than keeping both, I'd suggest replacing AsyncStore
with
>>>>> the
>>>>> impl in CoalescedAsyncStore. We also need to look at the
>>>>> configuration
>>>>> for the new AsyncStore:
>>>>>
>>>>> Configuration wise in AsyncStoreConfig, I'd leave
threadPoolSize,
>>>>> enabled and add the read/write lock acquisition timeout. The rest
>>>>> I'd
>>>>> get rid as no longer apply:
>>>>>
>>>>> int batchSize = 100;
>>>>> long pollWait = 100;
>>>>> int queueSize = 10000;
>>>>>
>>>>> By default, I'd give 5000ms to the read/write lock acquisition
>>>>> timeout.
>>>>>
>>>>> Thoughts?
>>>>>
>>>>> On 07/09/2009 01:08 PM, Manik Surtani wrote:
>>>>>>
>>>>>> On 9 Jul 2009, at 11:37, Galder Zamarreno wrote:
>>>>>>
>>>>>>>
>>>>>>> On 07/09/2009 11:55 AM, Manik Surtani wrote:
>>>>>>>>
>>>>>>>> On 8 Jul 2009, at 19:53, Jason T. Greene wrote:
>>>>>>>>
>>>>>>>>> Manik Surtani wrote:
>>>>>>>>>>
>>>>>>>>>> * Make the concurrent map volatile
>>>>>>>>>> * When iterating, first create a new
ConcurrentMap and replace
>>>>>>>>>> the
>>>>>>>>>> old one with the new one so all concurrent
threads write to
>>>>>>>>>> the new
>>>>>>>>>> Map
>>>>>>>>>> * Iterate over the old map
>>>>>>>>>
>>>>>>>>> That would lead to race conditions since a concurrent
writing
>>>>>>>>> thread
>>>>>>>>> could write to the "old" map, either by
getting a recent yet
>>>>>>>>> incorrect
>>>>>>>>> read off the volatile, or reading it right before it
changes.
>>>>>>>>
>>>>>>>> True, since referencing the map and writing to it
isn't atomic.
>>>>>>>>
>>>>>>>> We could guard access to the map with a read/write lock.
Safe,
>>>>>>>> if a
>>>>>>>> little heavy-handed... map writers would acquire a RL
(since we
>>>>>>>> want
>>>>>>>> concurrent access here) but the async flushing thread
would need
>>>>>>>> to
>>>>>>>> wait
>>>>>>>> for a WL to swap the map reference, releasing the lock
after the
>>>>>>>> map
>>>>>>>> reference has been swapped.
>>>>>>>
>>>>>>> Yeah, I don't think it could be volatile because it only
>>>>>>> applies to
>>>>>>> the variable itself, not the contents pointed at. The R/W
lock
>>>>>>> approach looks like a valid one and better than using
>>>>>>> synchronized
>>>>>>> blocks. Another thing I had in my mind is whether we need
the
>>>>>>> copy to
>>>>>>> be a ConcurrentMap, we could probably just use a
>>>>>>> Immutables.immutableMapCopy().
>>>>>>
>>>>>> Sync blocks would suck since it is effectively an exclusive
>>>>>> lock. We
>>>>>> want non-exclusive (read) locks for threads writing to this map
>>>>>> - we
>>>>>> want multiple application threads to do this concurrently
>>>>>> otherwise this
>>>>>> becomes a bottleneck. And hence the need for a ConcurrentMap.
>>>>>>
>>>>>> And we don't make a copy at all - all the async flushing
thread
>>>>>> does is
>>>>>> that it creates a new, empty ConcurrentMap and swaps the
'async
>>>>>> queue'
>>>>>> Map so that application threads can continue logging their
changes
>>>>>> somewhere while old changes can be iterated through and flushed.
>>>>>> And for
>>>>>> this, the field still needs to be volatile (not strictly so,
since
>>>>>> we
>>>>>> can rely on the fencing that will happen in the lock as a side-
>>>>>> effect)
>>>>>>
>>>>>>> I looked at the two implementations (ConcurrentHashMap(Map)
>>>>>>> constructor vs Immutables.immutableMapCopy() which would
>>>>>>> Object.clone() on the map) but I'm not sure which one
would be
>>>>>>> faster.
>>>>>>> Since clone() would rely on a native clone() impl, I'd
imagine
>>>>>>> that to
>>>>>>> be faster.
>>>>>>
>>>>>> Like I said, we don't need a copy. We need a new, empty map
for
>>>>>> threads
>>>>>> to log changes to while the old one's being flushed.
>>>>>>
>>>>>>>> --
>>>>>>>> Manik Surtani
>>>>>>>> manik(a)jboss.org
>>>>>>>> Lead, Infinispan
>>>>>>>> Lead, JBoss Cache
>>>>>>>>
http://www.infinispan.org
>>>>>>>>
http://www.jbosscache.org
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> _______________________________________________
>>>>>>>> infinispan-dev mailing list
>>>>>>>> infinispan-dev(a)lists.jboss.org
>>>>>>>>
https://lists.jboss.org/mailman/listinfo/infinispan-dev
>>>>>>>
>>>>>>> --
>>>>>>> Galder Zamarreño
>>>>>>> Sr. Software Engineer
>>>>>>> Infinispan, JBoss Cache
>>>>>>
>>>>>> --
>>>>>> Manik Surtani
>>>>>> manik(a)jboss.org
>>>>>> Lead, Infinispan
>>>>>> Lead, JBoss Cache
>>>>>>
http://www.infinispan.org
>>>>>>
http://www.jbosscache.org
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>> --
>>>> Galder Zamarreño
>>>> Sr. Software Engineer
>>>> Infinispan, JBoss Cache
>>>> _______________________________________________
>>>> infinispan-dev mailing list
>>>> infinispan-dev(a)lists.jboss.org
>>>>
https://lists.jboss.org/mailman/listinfo/infinispan-dev
>>>
>>> --
>>> Manik Surtani
>>> manik(a)jboss.org
>>> Lead, Infinispan
>>> Lead, JBoss Cache
>>>
http://www.infinispan.org
>>>
http://www.jbosscache.org
>>>
>>>
>>>
>>>
>>>
>>> _______________________________________________
>>> infinispan-dev mailing list
>>> infinispan-dev(a)lists.jboss.org
>>>
https://lists.jboss.org/mailman/listinfo/infinispan-dev
>>>
>> _______________________________________________
>> infinispan-dev mailing list
>> infinispan-dev(a)lists.jboss.org
>>
https://lists.jboss.org/mailman/listinfo/infinispan-dev
>
> --
> Manik Surtani
> manik(a)jboss.org
> Lead, Infinispan
> Lead, JBoss Cache
>
http://www.infinispan.org
>
http://www.jbosscache.org
>
>
>
>
>
> _______________________________________________
> infinispan-dev mailing list
> infinispan-dev(a)lists.jboss.org
>
https://lists.jboss.org/mailman/listinfo/infinispan-dev
--
Galder Zamarreño
Sr. Software Engineer
Infinispan, JBoss Cache