[infinispan-commits] Infinispan SVN: r618 - in trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc: binary and 2 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Fri Jul 24 08:54:11 EDT 2009


Author: mircea.markus
Date: 2009-07-24 08:54:10 -0400 (Fri, 24 Jul 2009)
New Revision: 618

Added:
   trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/DataManiulationHelper.java
Modified:
   trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/binary/JdbcBinaryCacheStore.java
   trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/mixed/JdbcMixedCacheStore.java
   trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/stringbased/JdbcStringBasedCacheStore.java
Log:
[ISPN-88] (reuse code between BinaryCacheStore and StringBasedCacheStore) - impemented

Added: trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/DataManiulationHelper.java
===================================================================
--- trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/DataManiulationHelper.java	                        (rev 0)
+++ trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/DataManiulationHelper.java	2009-07-24 12:54:10 UTC (rev 618)
@@ -0,0 +1,167 @@
+package org.infinispan.loaders.jdbc;
+
+import org.infinispan.container.entries.InternalCacheEntry;
+import org.infinispan.loaders.CacheLoaderException;
+import org.infinispan.loaders.jdbc.connectionfactory.ConnectionFactory;
+import org.infinispan.marshall.Marshaller;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * The purpose of this class is to factorize the repeating code between {@link org.infinispan.loaders.jdbc.stringbased.JdbcStringBasedCacheStore}
+ * and {@link org.infinispan.loaders.jdbc.binary.JdbcBinaryCacheStore}. This class implements GOF's template method pattern.
+ *
+ * @author Mircea.Markus at jboss.com
+ */
+public abstract class DataManiulationHelper {
+
+   private static Log log = LogFactory.getLog(DataManiulationHelper.class);
+
+   private ConnectionFactory connectionFactory;
+   private TableManipulation tableManipulation;
+   protected Marshaller marshaller;
+
+
+   public DataManiulationHelper(ConnectionFactory connectionFactory, TableManipulation tableManipulation, Marshaller marshaller) {
+      this.connectionFactory = connectionFactory;
+      this.tableManipulation = tableManipulation;
+      this.marshaller = marshaller;
+   }
+
+   public void clear() throws CacheLoaderException {
+      Connection conn = null;
+      PreparedStatement ps = null;
+      try {
+         String sql = tableManipulation.getDeleteAllRowsSql();
+         conn = connectionFactory.getConnection();
+         ps = conn.prepareStatement(sql);
+         int result = ps.executeUpdate();
+         if (log.isTraceEnabled())
+            log.trace("Successfully removed " + result + " rows.");
+      } catch (SQLException ex) {
+         logAndThrow(ex, "Failed clearing JdbcBinaryCacheStore");
+      } finally {
+         JdbcUtil.safeClose(ps);
+         connectionFactory.releaseConnection(conn);
+      }
+   }
+
+
+   public final void fromStreamSupport(ObjectInput objectInput) throws CacheLoaderException {
+      Connection conn = null;
+      PreparedStatement ps = null;
+      try {
+         conn = connectionFactory.getConnection();
+         String sql = tableManipulation.getInsertRowSql();
+         ps = conn.prepareStatement(sql);
+
+         int readCount = 0;
+         int batchSize = tableManipulation.getBatchSize();
+
+         Object objFromStream = marshaller.objectFromObjectStream(objectInput);
+         while (fromStreamProcess(objFromStream, ps, objectInput)) {
+            ps.addBatch();
+            readCount++;
+            if (readCount % batchSize == 0) {
+               ps.executeBatch();
+               if (log.isTraceEnabled())
+                  log.trace("Executing batch " + (readCount / batchSize) + ", batch size is " + batchSize);
+            }
+            objFromStream = marshaller.objectFromObjectStream(objectInput);
+         }
+         if (readCount % batchSize != 0)
+            ps.executeBatch();//flush the batch
+         if (log.isTraceEnabled())
+            log.trace("Successfully inserted " + readCount + " buckets into the database, batch size is " + batchSize);
+      } catch (IOException ex) {
+         logAndThrow(ex, "I/O failure while integrating state into store");
+      } catch (SQLException e) {
+         logAndThrow(e, "SQL failure while integrating state into store");
+      } catch (ClassNotFoundException e) {
+         logAndThrow(e, "Unexpected failure while integrating state into store");
+      } finally {
+         JdbcUtil.safeClose(ps);
+         connectionFactory.releaseConnection(conn);
+      }
+   }
+
+
+   public final void toStreamSupport(ObjectOutput objectOutput, byte streamDelimiter) throws CacheLoaderException {
+      //now write our data
+      Connection connection = null;
+      PreparedStatement ps = null;
+      ResultSet rs = null;
+      try {
+         String sql = tableManipulation.getLoadAllRowsSql();
+         if (log.isTraceEnabled()) log.trace("Running sql '" + sql);
+         connection = connectionFactory.getConnection();
+         ps = connection.prepareStatement(sql);
+         rs = ps.executeQuery();
+         rs.setFetchSize(tableManipulation.getFetchSize());
+         while (rs.next()) {
+            InputStream is = rs.getBinaryStream(1);
+            toStreamProcess(rs, is, objectOutput);
+         }
+         marshaller.objectToObjectStream(streamDelimiter, objectOutput);
+      } catch (SQLException e) {
+         logAndThrow(e, "SQL Error while storing string keys to database");
+      } catch (IOException e) {
+         logAndThrow(e, "I/O Error while storing string keys to database");
+      }
+      finally {
+         JdbcUtil.safeClose(rs);
+         JdbcUtil.safeClose(ps);
+         connectionFactory.releaseConnection(connection);
+      }
+
+   }
+
+
+   public final Set<InternalCacheEntry> loadAllSupport() throws CacheLoaderException {
+      Connection conn = null;
+      PreparedStatement ps = null;
+      ResultSet rs = null;
+      try {
+         String sql = tableManipulation.getLoadAllRowsSql();
+         conn = connectionFactory.getConnection();
+         ps = conn.prepareStatement(sql);
+         rs = ps.executeQuery();
+         rs.setFetchSize(tableManipulation.getFetchSize());
+         Set<InternalCacheEntry> result = new HashSet<InternalCacheEntry>();
+         while (rs.next()) {
+            loadAllProcess(rs, result);
+         }
+         return result;
+      } catch (SQLException e) {
+         String message = "SQL error while fetching all StoredEntries";
+         log.error(message, e);
+         throw new CacheLoaderException(message, e);
+      } finally {
+         JdbcUtil.safeClose(rs);
+         JdbcUtil.safeClose(ps);
+         connectionFactory.releaseConnection(conn);
+      }
+   }
+
+   public abstract void loadAllProcess(ResultSet rs, Set<InternalCacheEntry> result) throws SQLException, CacheLoaderException;
+
+   public abstract void toStreamProcess(ResultSet rs, InputStream is, ObjectOutput objectOutput) throws CacheLoaderException, SQLException, IOException;
+
+   public abstract boolean fromStreamProcess(Object objFromStream, PreparedStatement ps, ObjectInput objectInput) throws SQLException, CacheLoaderException, IOException, ClassNotFoundException;
+
+   public static void logAndThrow(Exception e, String message) throws CacheLoaderException {
+      log.error(message, e);
+      throw new CacheLoaderException(message, e);
+   }
+}


Property changes on: trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/DataManiulationHelper.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Modified: trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/binary/JdbcBinaryCacheStore.java
===================================================================
--- trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/binary/JdbcBinaryCacheStore.java	2009-07-24 09:13:27 UTC (rev 617)
+++ trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/binary/JdbcBinaryCacheStore.java	2009-07-24 12:54:10 UTC (rev 618)
@@ -9,6 +9,7 @@
 import org.infinispan.loaders.bucket.BucketBasedCacheStore;
 import org.infinispan.loaders.jdbc.JdbcUtil;
 import org.infinispan.loaders.jdbc.TableManipulation;
+import org.infinispan.loaders.jdbc.DataManiulationHelper;
 import org.infinispan.loaders.jdbc.connectionfactory.ConnectionFactory;
 import org.infinispan.marshall.Marshaller;
 import org.infinispan.util.logging.Log;
@@ -51,6 +52,7 @@
    private JdbcBinaryCacheStoreConfig config;
    private ConnectionFactory connectionFactory;
    private TableManipulation tableManipulation;
+   private DataManiulationHelper dmHelper;
 
    public void init(CacheLoaderConfig config, Cache cache, Marshaller m) {
       if (log.isTraceEnabled())
@@ -67,6 +69,36 @@
          factory.start(config.getConnectionFactoryConfig());
          doConnectionFactoryInitialization(factory);
       }
+      dmHelper = new DataManiulationHelper(connectionFactory, tableManipulation, marshaller) {
+         @Override
+         public void loadAllProcess(ResultSet rs, Set<InternalCacheEntry> result) throws SQLException, CacheLoaderException {
+            InputStream binaryStream = rs.getBinaryStream(1);
+            Bucket bucket = (Bucket) JdbcUtil.unmarshall(getMarshaller(), binaryStream);
+            result.addAll(bucket.getStoredEntries());
+         }
+
+         @Override
+         public void toStreamProcess(ResultSet rs, InputStream is, ObjectOutput objectOutput) throws CacheLoaderException, SQLException, IOException {
+            InputStream inputStream = rs.getBinaryStream(1);
+            Bucket bucket = (Bucket) JdbcUtil.unmarshall(getMarshaller(), inputStream);
+            String bucketName = rs.getString(2);
+            marshaller.objectToObjectStream(bucketName, objectOutput);
+            marshaller.objectToObjectStream(bucket, objectOutput);
+         }
+
+         public boolean fromStreamProcess(Object bucketName, PreparedStatement ps, ObjectInput objectInput) throws SQLException, CacheLoaderException, IOException, ClassNotFoundException {
+            if (bucketName instanceof String) {
+               Bucket bucket = (Bucket) marshaller.objectFromObjectStream(objectInput);
+               ByteBuffer buffer = JdbcUtil.marshall(getMarshaller(), bucket);
+               ps.setBinaryStream(1, buffer.getStream(), buffer.getLength());
+               ps.setLong(2, bucket.timestampOfFirstEntryToExpire());
+               ps.setString(3, (String) bucketName);
+               return true;
+            } else {
+               return false;
+            }
+         }
+      };
    }
 
    public void stop() throws CacheLoaderException {
@@ -95,7 +127,7 @@
             throw new CacheLoaderException("Unexpected insert result: '" + insertedRows + "'. Expected values is 1");
          }
       } catch (SQLException ex) {
-         logAndThrow(ex, "sql failure while inserting bucket: " + bucket);
+         DataManiulationHelper.logAndThrow(ex, "sql failure while inserting bucket: " + bucket);
       } finally {
          JdbcUtil.safeClose(ps);
          connectionFactory.releaseConnection(conn);
@@ -121,7 +153,7 @@
             throw new CacheLoaderException("Unexpected  update result: '" + updatedRows + "'. Expected values is 1");
          }
       } catch (SQLException e) {
-         logAndThrow(e, "sql failure while updating bucket: " + bucket);
+         DataManiulationHelper.logAndThrow(e, "sql failure while updating bucket: " + bucket);
       } finally {
          JdbcUtil.safeClose(ps);
          connectionFactory.releaseConnection(conn);
@@ -159,127 +191,23 @@
    }
 
    public Set<InternalCacheEntry> loadAllLockSafe() throws CacheLoaderException {
-      Connection conn = null;
-      PreparedStatement ps = null;
-      ResultSet rs = null;
-      try {
-         String sql = tableManipulation.getLoadAllRowsSql();
-         if (log.isTraceEnabled()) {
-            log.trace("Running loadAll. Sql: '" + sql + "'");
-         }
-         conn = connectionFactory.getConnection();
-         ps = conn.prepareStatement(sql);
-         rs = ps.executeQuery();
-         rs.setFetchSize(config.getFetchSize());
-         Set<InternalCacheEntry> result = new HashSet<InternalCacheEntry>();
-         while (rs.next()) {
-            InputStream binaryStream = rs.getBinaryStream(1);
-            Bucket bucket = (Bucket) JdbcUtil.unmarshall(getMarshaller(), binaryStream);
-            result.addAll(bucket.getStoredEntries());
-         }
-         return result;
-      } catch (SQLException e) {
-         String message = "sql failure while loading key: ";
-         log.error(message, e);
-         throw new CacheLoaderException(message, e);
-      } finally {
-         JdbcUtil.safeClose(rs);
-         JdbcUtil.safeClose(ps);
-         connectionFactory.releaseConnection(conn);
-      }
+      return dmHelper.loadAllSupport();
    }
 
    protected void fromStreamLockSafe(ObjectInput objectInput) throws CacheLoaderException {
-      Connection conn = null;
-      PreparedStatement ps = null;
-      try {
-         conn = connectionFactory.getConnection();
-         String sql = tableManipulation.getInsertRowSql();
-         ps = conn.prepareStatement(sql);
-
-         int readBuckets = 0;
-         int batchSize = config.getBatchSize();
-         Object bucketName = marshaller.objectFromObjectStream(objectInput);
-         while (bucketName instanceof String) {
-            Bucket bucket = (Bucket) marshaller.objectFromObjectStream(objectInput);
-            readBuckets++;
-            ByteBuffer buffer = JdbcUtil.marshall(getMarshaller(), bucket);
-            ps.setBinaryStream(1, buffer.getStream(), buffer.getLength());
-            ps.setLong(2, bucket.timestampOfFirstEntryToExpire());
-            ps.setString(3, (String) bucketName);
-            if (readBuckets % batchSize == 0) {
-               ps.executeBatch();
-               if (log.isTraceEnabled())
-                  log.trace("Executing batch " + (readBuckets / batchSize) + ", batch size is " + batchSize);
-            } else {
-               ps.addBatch();
-            }
-            bucketName = marshaller.objectFromObjectStream(objectInput);
-         }
-         if (readBuckets % batchSize != 0)
-            ps.executeBatch();//flush the batch
-         if (log.isTraceEnabled())
-            log.trace("Successfully inserted " + readBuckets + " buckets into the database, batch size is " + batchSize);
-      } catch (IOException ex) {
-         logAndThrow(ex, "I/O failure while integrating state into store");
-      } catch (SQLException e) {
-         logAndThrow(e, "SQL failure while integrating state into store");
-      } catch (ClassNotFoundException e) {
-         logAndThrow(e, "Unexpected failure while integrating state into store");
-      } finally {
-         JdbcUtil.safeClose(ps);
-         connectionFactory.releaseConnection(conn);
-      }
+      dmHelper.fromStreamSupport(objectInput);
    }
 
    protected void toStreamLockSafe(ObjectOutput objectOutput) throws CacheLoaderException {
-      Connection conn = null;
-      PreparedStatement ps = null;
-      ResultSet rs = null;
-      try {
-         conn = connectionFactory.getConnection();
-         String sql = tableManipulation.getLoadAllRowsSql();
-         ps = conn.prepareStatement(sql);
-         rs = ps.executeQuery();
-         rs.setFetchSize(config.getFetchSize());
-         while (rs.next()) {
-            InputStream inputStream = rs.getBinaryStream(1);
-            Bucket bucket = (Bucket) JdbcUtil.unmarshall(getMarshaller(), inputStream);
-            String bucketName = rs.getString(2);
-            marshaller.objectToObjectStream(bucketName, objectOutput);
-            marshaller.objectToObjectStream(bucket, objectOutput);
-         }
-         marshaller.objectToObjectStream(BINARY_STREAM_DELIMITER, objectOutput);
-      } catch (SQLException ex) {
-         logAndThrow(ex, "SQL failure while writing store's content to stream");
-      }
-      catch (IOException e) {
-         logAndThrow(e, "IO failure while writing store's content to stream");
-      } finally {
-         JdbcUtil.safeClose(rs);
-         JdbcUtil.safeClose(ps);
-         connectionFactory.releaseConnection(conn);
-      }
+      dmHelper.toStreamSupport(objectOutput, BINARY_STREAM_DELIMITER);
    }
 
+   @Override
    protected void clearLockSafe() throws CacheLoaderException {
-      Connection conn = null;
-      PreparedStatement ps = null;
-      try {
-         String sql = tableManipulation.getDeleteAllRowsSql();
-         conn = connectionFactory.getConnection();
-         ps = conn.prepareStatement(sql);
-         int result = ps.executeUpdate();
-         if (log.isTraceEnabled())
-            log.trace("Successfully removed " + result + " rows.");
-      } catch (SQLException ex) {
-         logAndThrow(ex, "Failed clearing JdbcBinaryCacheStore");
-      } finally {
-         JdbcUtil.safeClose(ps);
-         connectionFactory.releaseConnection(conn);
-      }
+      dmHelper.clear();
    }
 
+   @Override
    public void purgeInternal() throws CacheLoaderException {
       Connection conn = null;
       PreparedStatement ps = null;
@@ -309,7 +237,7 @@
          //if something happens make sure buckets locks are being release
          releaseLocks(expiredBuckets);
          connectionFactory.releaseConnection(conn);
-         logAndThrow(ex, "Failed clearing JdbcBinaryCacheStore");
+         DataManiulationHelper.logAndThrow(ex, "Failed clearing JdbcBinaryCacheStore");
       } finally {
          JdbcUtil.safeClose(ps);
          JdbcUtil.safeClose(rs);
@@ -353,7 +281,7 @@
          //if something happens make sure buckets locks are being release
          releaseLocks(emptyBuckets);
          connectionFactory.releaseConnection(conn);
-         logAndThrow(ex, "Failed clearing JdbcBinaryCacheStore");
+         DataManiulationHelper.logAndThrow(ex, "Failed clearing JdbcBinaryCacheStore");
       } finally {
          //release locks for the updated buckets.This won't include empty buckets, as these were migrated to emptyBuckets
          releaseLocks(expiredBuckets);
@@ -386,7 +314,7 @@
          }
       } catch (SQLException ex) {
          //if something happens make sure buckets locks are being release
-         logAndThrow(ex, "Failed clearing JdbcBinaryCacheStore");
+         DataManiulationHelper.logAndThrow(ex, "Failed clearing JdbcBinaryCacheStore");
       } finally {
          releaseLocks(emptyBuckets);
          JdbcUtil.safeClose(ps);
@@ -404,11 +332,6 @@
       return JdbcBinaryCacheStoreConfig.class;
    }
 
-   protected void logAndThrow(Exception e, String message) throws CacheLoaderException {
-      log.error(message, e);
-      throw new CacheLoaderException(message, e);
-   }
-
    public ConnectionFactory getConnectionFactory() {
       return connectionFactory;
    }

Modified: trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/mixed/JdbcMixedCacheStore.java
===================================================================
--- trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/mixed/JdbcMixedCacheStore.java	2009-07-24 09:13:27 UTC (rev 617)
+++ trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/mixed/JdbcMixedCacheStore.java	2009-07-24 12:54:10 UTC (rev 618)
@@ -62,10 +62,10 @@
       ConnectionFactoryConfig factoryConfig = config.getConnectionFactoryConfig();
       sharedConnectionFactory = ConnectionFactory.getConnectionFactory(factoryConfig.getConnectionFactoryClass());
       sharedConnectionFactory.start(factoryConfig);
+      binaryCacheStore.doConnectionFactoryInitialization(sharedConnectionFactory);
       binaryCacheStore.start();
-      binaryCacheStore.doConnectionFactoryInitialization(sharedConnectionFactory);
+      stringBasedCacheStore.doConnectionFactoryInitialization(sharedConnectionFactory);
       stringBasedCacheStore.start();
-      stringBasedCacheStore.doConnectionFactoryInitialization(sharedConnectionFactory);
    }
 
    @Override

Modified: trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/stringbased/JdbcStringBasedCacheStore.java
===================================================================
--- trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/stringbased/JdbcStringBasedCacheStore.java	2009-07-24 09:13:27 UTC (rev 617)
+++ trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/stringbased/JdbcStringBasedCacheStore.java	2009-07-24 12:54:10 UTC (rev 618)
@@ -7,6 +7,7 @@
 import org.infinispan.loaders.CacheLoaderConfig;
 import org.infinispan.loaders.CacheLoaderException;
 import org.infinispan.loaders.LockSupportCacheStore;
+import org.infinispan.loaders.jdbc.DataManiulationHelper;
 import org.infinispan.loaders.jdbc.JdbcUtil;
 import org.infinispan.loaders.jdbc.TableManipulation;
 import org.infinispan.loaders.jdbc.connectionfactory.ConnectionFactory;
@@ -22,7 +23,6 @@
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
-import java.util.HashSet;
 import java.util.Set;
 
 /**
@@ -53,6 +53,7 @@
    private Key2StringMapper key2StringMapper;
    private ConnectionFactory connectionFactory;
    private TableManipulation tableManipulation;
+   private DataManiulationHelper dmHelper;
 
    public void init(CacheLoaderConfig config, Cache cache, Marshaller m) {
       super.init(config, cache, m);
@@ -69,8 +70,40 @@
          doConnectionFactoryInitialization(connectionFactory);
       }
       this.key2StringMapper = config.getKey2StringMapper();
+      dmHelper = new DataManiulationHelper(connectionFactory, tableManipulation, marshaller) {
+
+         @Override
+         public void loadAllProcess(ResultSet rs, Set<InternalCacheEntry> result) throws SQLException, CacheLoaderException {
+            InputStream inputStream = rs.getBinaryStream(1);
+            InternalCacheValue icv = (InternalCacheValue) JdbcUtil.unmarshall(getMarshaller(), inputStream);
+            Object key = rs.getObject(2);
+            result.add(icv.toInternalCacheEntry(key));
+         }
+
+         @Override
+         public void toStreamProcess(ResultSet rs, InputStream is, ObjectOutput objectOutput) throws CacheLoaderException, SQLException, IOException {
+            InternalCacheValue icv = (InternalCacheValue) JdbcUtil.unmarshall(getMarshaller(), is);
+            Object key = rs.getObject(2);
+            marshaller.objectToObjectStream(icv.toInternalCacheEntry(key), objectOutput);
+         }
+
+         public boolean fromStreamProcess(Object objFromStream, PreparedStatement ps, ObjectInput objectInput) throws SQLException, CacheLoaderException {
+            if (objFromStream instanceof InternalCacheEntry) {
+               InternalCacheEntry se = (InternalCacheEntry) objFromStream;
+               String key = key2StringMapper.getStringMapping(se.getKey());
+               ByteBuffer buffer = JdbcUtil.marshall(getMarshaller(), se.toInternalCacheValue());
+               ps.setBinaryStream(1, buffer.getStream(), buffer.getLength());
+               ps.setLong(2, se.getExpiryTime());
+               ps.setString(3, key);
+               return true;
+            } else {
+               return false;
+            }
+         }
+      };
    }
 
+   @Override
    public void stop() throws CacheLoaderException {
       tableManipulation.stop();
       if (config.isManageConnectionFactory()) {
@@ -78,6 +111,7 @@
       }
    }
 
+   @Override
    protected String getLockFromKey(Object key) throws CacheLoaderException {
       if (!key2StringMapper.isSupportedType(key.getClass())) {
          throw new UnsupportedKeyTypeException(key);
@@ -85,6 +119,7 @@
       return key2StringMapper.getStringMapping(key);
    }
 
+   @Override
    public void storeLockSafe(InternalCacheEntry ed, String lockingKey) throws CacheLoaderException {
       InternalCacheEntry existingOne = loadLockSafe(ed, lockingKey);
       String sql;
@@ -113,6 +148,7 @@
       }
    }
 
+   @Override
    public boolean removeLockSafe(Object key, String keyStr) throws CacheLoaderException {
       Connection connection = null;
       PreparedStatement ps = null;
@@ -133,100 +169,27 @@
       }
    }
 
+   @Override
    public void fromStreamLockSafe(ObjectInput objectInput) throws CacheLoaderException {
-      Connection conn = null;
-      PreparedStatement ps = null;
-      try {
-         conn = connectionFactory.getConnection();
-         String sql = tableManipulation.getInsertRowSql();
-         ps = conn.prepareStatement(sql);
-
-         int readStoredEntries = 0;
-         int batchSize = config.getBatchSize();
-         Object objFromStream = marshaller.objectFromObjectStream(objectInput);
-         while (objFromStream instanceof InternalCacheEntry) {
-            InternalCacheEntry se = (InternalCacheEntry) objFromStream;
-            readStoredEntries++;
-            String key = key2StringMapper.getStringMapping(se.getKey());
-            ByteBuffer buffer = JdbcUtil.marshall(getMarshaller(), se.toInternalCacheValue());
-            ps.setBinaryStream(1, buffer.getStream(), buffer.getLength());
-            ps.setLong(2, se.getExpiryTime());
-            ps.setString(3, key);
-            ps.addBatch();
-            if (readStoredEntries % batchSize == 0) {
-               ps.executeBatch();
-               if (log.isTraceEnabled())
-                  log.trace("Executing batch " + (readStoredEntries / batchSize) + ", batch size is " + batchSize);
-            }
-            objFromStream = marshaller.objectFromObjectStream(objectInput);
-         }
-         if (readStoredEntries % batchSize != 0)
-            ps.executeBatch();//flush the batch
-         if (log.isTraceEnabled())
-            log.trace("Successfully inserted " + readStoredEntries + " buckets into the database, batch size is " + batchSize);
-      } catch (IOException ex) {
-         logAndThrow(ex, "I/O failure while integrating state into store");
-      } catch (SQLException e) {
-         logAndThrow(e, "SQL failure while integrating state into store");
-      } catch (ClassNotFoundException e) {
-         logAndThrow(e, "Unexpected failure while integrating state into store");
-      } finally {
-         JdbcUtil.safeClose(ps);
-         connectionFactory.releaseConnection(conn);
-      }
+      dmHelper.fromStreamSupport(objectInput);
    }
 
+   @Override
    protected void toStreamLockSafe(ObjectOutput objectOutput) throws CacheLoaderException {
-      //now write our data
-      Connection connection = null;
-      PreparedStatement ps = null;
-      ResultSet rs = null;
-      try {
-         String sql = tableManipulation.getLoadAllRowsSql();
-         if (log.isTraceEnabled()) log.trace("Running sql '" + sql);
-         connection = connectionFactory.getConnection();
-         ps = connection.prepareStatement(sql);
-         rs = ps.executeQuery();
-         rs.setFetchSize(config.getFetchSize());
-         while (rs.next()) {
-            InputStream is = rs.getBinaryStream(1);
-            InternalCacheValue icv = (InternalCacheValue) JdbcUtil.unmarshall(getMarshaller(), is);
-            Object key = rs.getObject(2);
-            marshaller.objectToObjectStream(icv.toInternalCacheEntry(key), objectOutput);
-         }
-         marshaller.objectToObjectStream(STRING_STREAM_DELIMITER, objectOutput);
-      } catch (SQLException e) {
-         logAndThrow(e, "SQL Error while storing string keys to database");
-      } catch (IOException e) {
-         logAndThrow(e, "I/O Error while storing string keys to database");
-      }
-      finally {
-         JdbcUtil.safeClose(rs);
-         JdbcUtil.safeClose(ps);
-         connectionFactory.releaseConnection(connection);
-      }
+      dmHelper.toStreamSupport(objectOutput, STRING_STREAM_DELIMITER);
    }
 
    @Override
    protected void clearLockSafe() throws CacheLoaderException {
-      Connection conn = null;
-      PreparedStatement ps = null;
-      try {
-         String sql = tableManipulation.getDeleteAllRowsSql();
-         conn = connectionFactory.getConnection();
-         ps = conn.prepareStatement(sql);
-         int result = ps.executeUpdate();
-         if (log.isTraceEnabled())
-            log.trace("Successfully removed " + result + " rows.");
-      } catch (SQLException ex) {
-         logAndThrow(ex, "Failed clearing JdbcBinaryCacheStore");
-      } finally {
-         JdbcUtil.safeClose(ps);
-         connectionFactory.releaseConnection(conn);
-      }
+      dmHelper.clear();
    }
 
    @Override
+   protected Set<InternalCacheEntry> loadAllLockSafe() throws CacheLoaderException {
+      return dmHelper.loadAllSupport();
+   }
+
+   @Override
    public void purgeInternal() throws CacheLoaderException {
       Connection conn = null;
       PreparedStatement ps = null;
@@ -246,35 +209,8 @@
       }
    }
 
-   protected Set<InternalCacheEntry> loadAllLockSafe() throws CacheLoaderException {
-      Connection conn = null;
-      PreparedStatement ps = null;
-      ResultSet rs = null;
-      try {
-         String sql = tableManipulation.getLoadAllRowsSql();
-         conn = connectionFactory.getConnection();
-         ps = conn.prepareStatement(sql);
-         rs = ps.executeQuery();
-         rs.setFetchSize(config.getFetchSize());
-         Set<InternalCacheEntry> result = new HashSet<InternalCacheEntry>();
-         while (rs.next()) {
-            InputStream inputStream = rs.getBinaryStream(1);
-            InternalCacheValue icv = (InternalCacheValue) JdbcUtil.unmarshall(getMarshaller(), inputStream);
-            Object key = rs.getObject(2);
-            result.add(icv.toInternalCacheEntry(key));
-         }
-         return result;
-      } catch (SQLException e) {
-         String message = "SQL error while fetching all StoredEntries";
-         log.error(message, e);
-         throw new CacheLoaderException(message, e);
-      } finally {
-         JdbcUtil.safeClose(rs);
-         JdbcUtil.safeClose(ps);
-         connectionFactory.releaseConnection(conn);
-      }
-   }
 
+   @Override
    protected InternalCacheEntry loadLockSafe(Object key, String lockingKey) throws CacheLoaderException {
       Connection conn = null;
       PreparedStatement ps = null;



More information about the infinispan-commits mailing list