[infinispan-dev] [ISPN-116] Async cache store: aggregation of multiple changes on a single key

Amin Abbaspour a.abbaspour at gmail.com
Sun Jul 26 08:20:08 EDT 2009


Hi Galder,

I checked out latest version (from subversion trunk) and used it for my test.

Here is config in xml:
---
<?xml version="1.0" encoding="UTF-8"?>

<infinispan xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="urn:infinispan:config:4.0">

    <global>
        <transport clusterName="SmppGatewayCluster"/>
    </global>

    <namedCache name="accounts">

        <eviction wakeUpInterval="5000" maxEntries="100" strategy="FIFO"/>

        <expiration lifespan="600" maxIdle="1000"/>

        <loaders passivation="false" shared="false" preload="true">

            <loader
class="com.magfa.gateway.dao.infinispan.jdbcstore.AccountJdbcCacheStore"
fetchPersistentState="true"
                    ignoreModifications="false" purgeOnStartup="true">

                <properties/>

                <singletonStore enabled="true"
pushStateWhenCoordinator="true" pushStateTimeout="20000"/>

                <async enabled="true" threadPoolSize="5"/>
            </loader>
        </loaders>

    </namedCache>

</infinispan>

---
And here is my similar config code:

---
       Configuration config = new Configuration();

       GlobalConfiguration globalConfiguration =
GlobalConfiguration.getNonClusteredDefault();

       AsyncStoreConfig asyncStoreConfig = new AsyncStoreConfig();
       asyncStoreConfig.setEnabled(true);

       CacheStoreConfig cacheStoreConfig = new AccountJdbcCacheStoreConfig();
       cacheStoreConfig.setAsyncStoreConfig(asyncStoreConfig);
       cacheStoreConfig.setPurgeSynchronously(false);

       CacheLoaderManagerConfig clmc = new CacheLoaderManagerConfig();

       List<CacheLoaderConfig> cacheStoreConfigs = new
ArrayList<CacheLoaderConfig>(1);
       cacheStoreConfigs.add(cacheStoreConfig);

       clmc.setCacheLoaderConfigs(cacheStoreConfigs);

       config.setCacheLoaderManagerConfig(clmc);

       config.setEvictionStrategy(EvictionStrategy.FIFO);
       config.setExpirationLifespan(10000);
       config.setExpirationMaxIdle(10000);
       config.setEvictionMaxEntries(100);
       config.setEvictionWakeUpInterval(60 * 1000);

       config.setCacheMode(Configuration.CacheMode.INVALIDATION_ASYNC);
// todo: important

       //CacheStoreConfig
       CacheManager manager = new
DefaultCacheManager(globalConfiguration, config);
---

and here is my custom store:
---
public class AccountJdbcCacheStore extends LockSupportCacheStore {

   private static final Logger log =
Logger.getLogger(AccountJdbcCacheStore.class);

   private final ConnectionPool pool;

   // todo; why constructor is called twice?
   public AccountJdbcCacheStore() {

       Properties properties = new Properties();
       properties.setProperty("driver", "com.mysql.jdbc.Driver");
       properties.setProperty("url",
"jdbc:mysql://dbsrv:3306/accounts?autoReconnect=true");
       properties.setProperty("user", "*****");
       properties.setProperty("password", "*****");

       pool = new ProxoolConnectionPool(properties);
   }

   protected void clearLockSafe() throws CacheLoaderException {
       throw new CacheLoaderException("Will not clear AccountBillingTable");
   }

   private static final String LOAD_ACCOUNTS_QUERY = "SELECT * FROM
accountsBilling";

   protected Set<InternalCacheEntry> loadAllLockSafe() throws
CacheLoaderException {

       log.trace("loadAllLockSafe()");

       Connection connection = null;
       Statement st = null;
       ResultSet rs = null;

       try {
           connection = pool.getConnection();

           st = connection.createStatement();
           rs = st.executeQuery(LOAD_ACCOUNTS_QUERY); // todo: where enable=1

           final Set<InternalCacheEntry> results = new
HashSet<InternalCacheEntry>();

           while (rs.next()) {
               results.add(InternalEntryFactory.create(rs.getLong("id"),
EngineCompatibleBillingBackend.fetchFromResultSet(rs)));
           }

           return results;

       } catch (SQLException e) {
           throw new RuntimeException("Exception in loading accounts
from good old accountsBilling ", e);
       } finally {
           pool.attemptClose(rs);
           pool.attemptClose(st);
           pool.attemptClose(connection);
       }

   }

   protected void toStreamLockSafe(ObjectOutput oos) throws
CacheLoaderException {
       throw new CacheLoaderException("toStreamLockSafe(); What should
i do with ObjectOutput: " + oos);
   }

   protected void fromStreamLockSafe(ObjectInput ois) throws
CacheLoaderException {
       throw new CacheLoaderException("fromStreamLockSafe(); What
should i do with ObjectInput: " + ois);
   }

   protected boolean removeLockSafe(Object key, String lockingKey)
throws CacheLoaderException {
       log.trace("removeLockSafe(); will not remove anything from
AccountsTable with key: " + key + ", lockingKey: " + lockingKey);
       return true;
   }

   private final static double OUTGOING_FARSI_COST_BASE = 71.2d;
   private final static double OUTGOING_ENGLISH_COST_BASE = 177.6d;
   private final static String UPDATE_ACCOUNT_QUERY;

   static {
       final String TOTAL_MESSAGE_COST_FORMULA = "(?*(farsiCost+" +
OUTGOING_FARSI_COST_BASE + ") + (?*(englishCost+"
               + OUTGOING_ENGLISH_COST_BASE + "))+(?*incomingCost))";

       UPDATE_ACCOUNT_QUERY = "UPDATE accountsBilling SET
totalSentMessages = totalSentMessages + ?, " +
               "totalReceivedMessages = totalReceivedMessages + ?,
farsiSentMessages = farsiSentMessages + ? , " +
               "englishSentMessages = englishSentMessages + ?, " +
               "usedCredit2 = usedCredit2 + " +
TOTAL_MESSAGE_COST_FORMULA + ',' +
               "credit2 = credit2 - " + TOTAL_MESSAGE_COST_FORMULA + ' ' +
               "WHERE id=?";
   }

   private static final AtomicInteger storeCounter = new AtomicInteger();

   protected void storeLockSafe(InternalCacheEntry ed, String
lockingKey) throws CacheLoaderException {

       final Account account = (Account) ed.getValue();

       log.info("-------> storeLockSafe(" +
storeCounter.incrementAndGet() + "); storing account: " + account + ",
lockingKey: " + lockingKey);

       final long sent = account.getSentAndReset();
       final long received = account.getReceivedAndReset();
       final long dlr = account.getDlrAndReset();
       final long farsiSent = account.getFarsiSentAndReset();
       final long englishSent = account.getEnglishSentAndReset();

       Connection connection = null;
       PreparedStatement st = null;

       try {
           connection = pool.getConnection();
           st = connection.prepareStatement(UPDATE_ACCOUNT_QUERY);

           st.setLong(1, sent);
           st.setLong(2, received);
           st.setLong(3, farsiSent);
           st.setLong(4, englishSent);
           st.setLong(5, farsiSent);
           st.setLong(6, englishSent);
           st.setLong(7, received);
           st.setLong(8, farsiSent);
           st.setLong(9, englishSent);
           st.setLong(10, received);
           st.setLong(11, account.getId());

           st.executeUpdate();
       } catch (SQLException e) {
           throw new RuntimeException("Exception in updating
accountsBilling from engine DB", e);
       } finally {
           pool.attemptClose(st);
           pool.attemptClose(connection);
       }
   }

   protected InternalCacheEntry loadLockSafe(Object key, String
lockingKey) throws CacheLoaderException {

       log.info("loadLockSafe(), key: " + key + ", lockingKey: " + lockingKey);

       Connection conn = null;
       Statement st = null;
       ResultSet rs = null;

       try {
           conn = pool.getConnection();
           st = conn.createStatement();
           rs = st.executeQuery("SELECT * FROM accountsBilling WHERE
id=" + lockingKey);

           if (rs.next()) {
               return InternalEntryFactory.create(key,
EngineCompatibleBillingBackend.fetchFromResultSet(rs));
           }

           return null;
       } catch (SQLException e) {
           final String message = "SQL error while fetching strored
entry with key:" + key + " lockingKey: " + lockingKey;
           log.error(message, e);
           throw new CacheLoaderException(message, e);
       } finally {
           pool.attemptClose(rs);
           pool.attemptClose(st);
           pool.attemptClose(conn);
       }
   }

   protected String getLockFromKey(Object key) throws CacheLoaderException {
       return key.toString();
   }

   public Class<? extends CacheLoaderConfig> getConfigurationClass() {
       return AccountJdbcCacheStoreConfig.class;
   }
}
----
and custom config:
---
public class AccountJdbcCacheStoreConfig extends LockSupportCacheStoreConfig {

   private static final Logger log =
Logger.getLogger(AccountJdbcCacheStoreConfig.class);

   public LockSupportCacheStoreConfig clone() {
       return new AccountJdbcCacheStoreConfig();
   }

   public String getCacheLoaderClassName() {
       return AccountJdbcCacheStore.class.getName();
   }

   public void setCacheLoaderClassName(String s) {
   }
}
---


but I still have two issues:

1. Using config via code, I get the flush to db working but still no
aggregation. For 100 even I have 100 stores.
2. Using xml config, no storeLockSafe is called at all?

Thanks for your help,
Amin Abbaspour

On Mon, Jul 20, 2009 at 7:09 PM, Galder
Zamarreno<galder.zamarreno at redhat.com> wrote:
>
>
> On 07/20/2009 02:39 PM, Manik Surtani wrote:
>>
>> On 20 Jul 2009, at 12:35, Amin Abbaspour wrote:
>>
>>> Hi,
>>>
>>> Thanks Galder. I will test CoalescedAsyncStore ASAP and let you know.
>>>
>>> BTW Manik, I think the change is so much that we'd better see this as
>>> a completely new store and later replace AsyncCacheStore or keep them
>>> both.
>>
>> I agree that the impl is significantly different, but from a
>> functional/feature standpoint, what does it offer over and above the
>> AsyncCacheStore, except that it just does the same thing "better" ?  :-)
>
> I agree with Manik. I would have only left the other implementation if there
> was anything in particular the past implementation offered versus the
> coalesced one. My perf tests indicated that we're as fast as before but with
> the added bonus of lesser cache store calls, so that looks to me a win/win
> situation.
>
> I've now committed https://jira.jboss.org/jira/browse/ISPN-116 and added an
> FAQ entry to http://www.jboss.org/community/wiki/InfinispanTechnicalFAQs
>
> I'll talk about this further on a blog entry later this week.
>
> FAO Amin, if you wanna try it out, checkout latest from trunk and let us
> know what you think. No need for configuration changes, simply add <async
> enabled="true" /> and that should be enough.
>
>>
>>
>>> Regards,
>>> Amin Abbaspour
>>>
>>> On Mon, Jul 20, 2009 at 2:47 PM, Manik Surtani<manik at jboss.org>  wrote:
>>>>
>>>> Nope, never got the email below.
>>>>
>>>> Anyway, I always thought the intention was to enhance or improve the
>>>> AsyncCacheStore, not to create another one.
>>>>
>>>> Cheers
>>>> Manik
>>>>
>>>> On 20 Jul 2009, at 10:55, Galder Zamarreno wrote:
>>>>
>>>>> Did other list readers ever got this message?
>>>>>
>>>>> I did send it last week but never actually got it via the dev
>>>>> list, so
>>>>> double checking...
>>>>>
>>>>> On 07/15/2009 07:58 PM, Galder Zamarreno wrote:
>>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> As part of this JIRA, I'm thinking whether we wanna leave the
>>>>>> current
>>>>>> queue based async store.
>>>>>>
>>>>>> For the work I've done, I've created a new class called
>>>>>> CoalescedAsyncStore that works in the way agreed below, iow,
>>>>>> instead of
>>>>>> queying mods, it keeps a ConcurrentMap and which is swapped by a
>>>>>> new
>>>>>> instance when the async thread is going to apply the changes.
>>>>>>
>>>>>> I've run some perf tests between CoalescedAsyncStore and AsyncStore
>>>>>> and
>>>>>> when doing 1 million store where each key is different, the
>>>>>> performance
>>>>>> is fairly similar. The difference comes when trying to run a
>>>>>> similar
>>>>>> test where the same key is always updated, this results in lesser
>>>>>> underlying store calls and hence CoalescedAsyncStore solution is
>>>>>> faster
>>>>>> here.
>>>>>>
>>>>>> So, rather than keeping both, I'd suggest replacing AsyncStore with
>>>>>> the
>>>>>> impl in CoalescedAsyncStore. We also need to look at the
>>>>>> configuration
>>>>>> for the new AsyncStore:
>>>>>>
>>>>>> Configuration wise in AsyncStoreConfig, I'd leave threadPoolSize,
>>>>>> enabled and add the read/write lock acquisition timeout. The rest
>>>>>> I'd
>>>>>> get rid as no longer apply:
>>>>>>
>>>>>> int batchSize = 100;
>>>>>> long pollWait = 100;
>>>>>> int queueSize = 10000;
>>>>>>
>>>>>> By default, I'd give 5000ms to the read/write lock acquisition
>>>>>> timeout.
>>>>>>
>>>>>> Thoughts?
>>>>>>
>>>>>> On 07/09/2009 01:08 PM, Manik Surtani wrote:
>>>>>>>
>>>>>>> On 9 Jul 2009, at 11:37, Galder Zamarreno wrote:
>>>>>>>
>>>>>>>>
>>>>>>>> On 07/09/2009 11:55 AM, Manik Surtani wrote:
>>>>>>>>>
>>>>>>>>> On 8 Jul 2009, at 19:53, Jason T. Greene wrote:
>>>>>>>>>
>>>>>>>>>> Manik Surtani wrote:
>>>>>>>>>>>
>>>>>>>>>>> * Make the concurrent map volatile
>>>>>>>>>>> * When iterating, first create a new ConcurrentMap and replace
>>>>>>>>>>> the
>>>>>>>>>>> old one with the new one so all concurrent threads write to
>>>>>>>>>>> the new
>>>>>>>>>>> Map
>>>>>>>>>>> * Iterate over the old map
>>>>>>>>>>
>>>>>>>>>> That would lead to race conditions since a concurrent writing
>>>>>>>>>> thread
>>>>>>>>>> could write to the "old" map, either by getting a recent yet
>>>>>>>>>> incorrect
>>>>>>>>>> read off the volatile, or reading it right before it changes.
>>>>>>>>>
>>>>>>>>> True, since referencing the map and writing to it isn't atomic.
>>>>>>>>>
>>>>>>>>> We could guard access to the map with a read/write lock. Safe,
>>>>>>>>> if a
>>>>>>>>> little heavy-handed... map writers would acquire a RL (since we
>>>>>>>>> want
>>>>>>>>> concurrent access here) but the async flushing thread would need
>>>>>>>>> to
>>>>>>>>> wait
>>>>>>>>> for a WL to swap the map reference, releasing the lock after the
>>>>>>>>> map
>>>>>>>>> reference has been swapped.
>>>>>>>>
>>>>>>>> Yeah, I don't think it could be volatile because it only
>>>>>>>> applies to
>>>>>>>> the variable itself, not the contents pointed at. The R/W lock
>>>>>>>> approach looks like a valid one and better than using
>>>>>>>> synchronized
>>>>>>>> blocks. Another thing I had in my mind is whether we need the
>>>>>>>> copy to
>>>>>>>> be a ConcurrentMap, we could probably just use a
>>>>>>>> Immutables.immutableMapCopy().
>>>>>>>
>>>>>>> Sync blocks would suck since it is effectively an exclusive
>>>>>>> lock. We
>>>>>>> want non-exclusive (read) locks for threads writing to this map
>>>>>>> - we
>>>>>>> want multiple application threads to do this concurrently
>>>>>>> otherwise this
>>>>>>> becomes a bottleneck. And hence the need for a ConcurrentMap.
>>>>>>>
>>>>>>> And we don't make a copy at all - all the async flushing thread
>>>>>>> does is
>>>>>>> that it creates a new, empty ConcurrentMap and swaps the 'async
>>>>>>> queue'
>>>>>>> Map so that application threads can continue logging their changes
>>>>>>> somewhere while old changes can be iterated through and flushed.
>>>>>>> And for
>>>>>>> this, the field still needs to be volatile (not strictly so, since
>>>>>>> we
>>>>>>> can rely on the fencing that will happen in the lock as a side-
>>>>>>> effect)
>>>>>>>
>>>>>>>> I looked at the two implementations (ConcurrentHashMap(Map)
>>>>>>>> constructor vs Immutables.immutableMapCopy() which would
>>>>>>>> Object.clone() on the map) but I'm not sure which one would be
>>>>>>>> faster.
>>>>>>>> Since clone() would rely on a native clone() impl, I'd imagine
>>>>>>>> that to
>>>>>>>> be faster.
>>>>>>>
>>>>>>> Like I said, we don't need a copy. We need a new, empty map for
>>>>>>> threads
>>>>>>> to log changes to while the old one's being flushed.
>>>>>>>
>>>>>>>>> --
>>>>>>>>> Manik Surtani
>>>>>>>>> manik at jboss.org
>>>>>>>>> Lead, Infinispan
>>>>>>>>> Lead, JBoss Cache
>>>>>>>>> http://www.infinispan.org
>>>>>>>>> http://www.jbosscache.org
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> _______________________________________________
>>>>>>>>> infinispan-dev mailing list
>>>>>>>>> infinispan-dev at lists.jboss.org
>>>>>>>>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>>>>>>>>
>>>>>>>> --
>>>>>>>> Galder Zamarreño
>>>>>>>> Sr. Software Engineer
>>>>>>>> Infinispan, JBoss Cache
>>>>>>>
>>>>>>> --
>>>>>>> Manik Surtani
>>>>>>> manik at jboss.org
>>>>>>> Lead, Infinispan
>>>>>>> Lead, JBoss Cache
>>>>>>> http://www.infinispan.org
>>>>>>> http://www.jbosscache.org
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>> --
>>>>> Galder Zamarreño
>>>>> Sr. Software Engineer
>>>>> Infinispan, JBoss Cache
>>>>> _______________________________________________
>>>>> infinispan-dev mailing list
>>>>> infinispan-dev at lists.jboss.org
>>>>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>>>>
>>>> --
>>>> Manik Surtani
>>>> manik at jboss.org
>>>> Lead, Infinispan
>>>> Lead, JBoss Cache
>>>> http://www.infinispan.org
>>>> http://www.jbosscache.org
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> _______________________________________________
>>>> infinispan-dev mailing list
>>>> infinispan-dev at lists.jboss.org
>>>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>>>>
>>> _______________________________________________
>>> infinispan-dev mailing list
>>> infinispan-dev at lists.jboss.org
>>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>>
>> --
>> Manik Surtani
>> manik at jboss.org
>> Lead, Infinispan
>> Lead, JBoss Cache
>> http://www.infinispan.org
>> http://www.jbosscache.org
>>
>>
>>
>>
>>
>> _______________________________________________
>> infinispan-dev mailing list
>> infinispan-dev at lists.jboss.org
>> https://lists.jboss.org/mailman/listinfo/infinispan-dev
>
> --
> Galder Zamarreño
> Sr. Software Engineer
> Infinispan, JBoss Cache
>




More information about the infinispan-dev mailing list