[infinispan-commits] Infinispan SVN: r618 - in trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc: binary and 2 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Fri Jul 24 08:54:11 EDT 2009
Author: mircea.markus
Date: 2009-07-24 08:54:10 -0400 (Fri, 24 Jul 2009)
New Revision: 618
Added:
trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/DataManiulationHelper.java
Modified:
trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/binary/JdbcBinaryCacheStore.java
trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/mixed/JdbcMixedCacheStore.java
trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/stringbased/JdbcStringBasedCacheStore.java
Log:
[ISPN-88] (reuse code between BinaryCacheStore and StringBasedCacheStore) - impemented
Added: trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/DataManiulationHelper.java
===================================================================
--- trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/DataManiulationHelper.java (rev 0)
+++ trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/DataManiulationHelper.java 2009-07-24 12:54:10 UTC (rev 618)
@@ -0,0 +1,167 @@
+package org.infinispan.loaders.jdbc;
+
+import org.infinispan.container.entries.InternalCacheEntry;
+import org.infinispan.loaders.CacheLoaderException;
+import org.infinispan.loaders.jdbc.connectionfactory.ConnectionFactory;
+import org.infinispan.marshall.Marshaller;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * The purpose of this class is to factorize the repeating code between {@link org.infinispan.loaders.jdbc.stringbased.JdbcStringBasedCacheStore}
+ * and {@link org.infinispan.loaders.jdbc.binary.JdbcBinaryCacheStore}. This class implements GOF's template method pattern.
+ *
+ * @author Mircea.Markus at jboss.com
+ */
+public abstract class DataManiulationHelper {
+
+ private static Log log = LogFactory.getLog(DataManiulationHelper.class);
+
+ private ConnectionFactory connectionFactory;
+ private TableManipulation tableManipulation;
+ protected Marshaller marshaller;
+
+
+ public DataManiulationHelper(ConnectionFactory connectionFactory, TableManipulation tableManipulation, Marshaller marshaller) {
+ this.connectionFactory = connectionFactory;
+ this.tableManipulation = tableManipulation;
+ this.marshaller = marshaller;
+ }
+
+ public void clear() throws CacheLoaderException {
+ Connection conn = null;
+ PreparedStatement ps = null;
+ try {
+ String sql = tableManipulation.getDeleteAllRowsSql();
+ conn = connectionFactory.getConnection();
+ ps = conn.prepareStatement(sql);
+ int result = ps.executeUpdate();
+ if (log.isTraceEnabled())
+ log.trace("Successfully removed " + result + " rows.");
+ } catch (SQLException ex) {
+ logAndThrow(ex, "Failed clearing JdbcBinaryCacheStore");
+ } finally {
+ JdbcUtil.safeClose(ps);
+ connectionFactory.releaseConnection(conn);
+ }
+ }
+
+
+ public final void fromStreamSupport(ObjectInput objectInput) throws CacheLoaderException {
+ Connection conn = null;
+ PreparedStatement ps = null;
+ try {
+ conn = connectionFactory.getConnection();
+ String sql = tableManipulation.getInsertRowSql();
+ ps = conn.prepareStatement(sql);
+
+ int readCount = 0;
+ int batchSize = tableManipulation.getBatchSize();
+
+ Object objFromStream = marshaller.objectFromObjectStream(objectInput);
+ while (fromStreamProcess(objFromStream, ps, objectInput)) {
+ ps.addBatch();
+ readCount++;
+ if (readCount % batchSize == 0) {
+ ps.executeBatch();
+ if (log.isTraceEnabled())
+ log.trace("Executing batch " + (readCount / batchSize) + ", batch size is " + batchSize);
+ }
+ objFromStream = marshaller.objectFromObjectStream(objectInput);
+ }
+ if (readCount % batchSize != 0)
+ ps.executeBatch();//flush the batch
+ if (log.isTraceEnabled())
+ log.trace("Successfully inserted " + readCount + " buckets into the database, batch size is " + batchSize);
+ } catch (IOException ex) {
+ logAndThrow(ex, "I/O failure while integrating state into store");
+ } catch (SQLException e) {
+ logAndThrow(e, "SQL failure while integrating state into store");
+ } catch (ClassNotFoundException e) {
+ logAndThrow(e, "Unexpected failure while integrating state into store");
+ } finally {
+ JdbcUtil.safeClose(ps);
+ connectionFactory.releaseConnection(conn);
+ }
+ }
+
+
+ public final void toStreamSupport(ObjectOutput objectOutput, byte streamDelimiter) throws CacheLoaderException {
+ //now write our data
+ Connection connection = null;
+ PreparedStatement ps = null;
+ ResultSet rs = null;
+ try {
+ String sql = tableManipulation.getLoadAllRowsSql();
+ if (log.isTraceEnabled()) log.trace("Running sql '" + sql);
+ connection = connectionFactory.getConnection();
+ ps = connection.prepareStatement(sql);
+ rs = ps.executeQuery();
+ rs.setFetchSize(tableManipulation.getFetchSize());
+ while (rs.next()) {
+ InputStream is = rs.getBinaryStream(1);
+ toStreamProcess(rs, is, objectOutput);
+ }
+ marshaller.objectToObjectStream(streamDelimiter, objectOutput);
+ } catch (SQLException e) {
+ logAndThrow(e, "SQL Error while storing string keys to database");
+ } catch (IOException e) {
+ logAndThrow(e, "I/O Error while storing string keys to database");
+ }
+ finally {
+ JdbcUtil.safeClose(rs);
+ JdbcUtil.safeClose(ps);
+ connectionFactory.releaseConnection(connection);
+ }
+
+ }
+
+
+ public final Set<InternalCacheEntry> loadAllSupport() throws CacheLoaderException {
+ Connection conn = null;
+ PreparedStatement ps = null;
+ ResultSet rs = null;
+ try {
+ String sql = tableManipulation.getLoadAllRowsSql();
+ conn = connectionFactory.getConnection();
+ ps = conn.prepareStatement(sql);
+ rs = ps.executeQuery();
+ rs.setFetchSize(tableManipulation.getFetchSize());
+ Set<InternalCacheEntry> result = new HashSet<InternalCacheEntry>();
+ while (rs.next()) {
+ loadAllProcess(rs, result);
+ }
+ return result;
+ } catch (SQLException e) {
+ String message = "SQL error while fetching all StoredEntries";
+ log.error(message, e);
+ throw new CacheLoaderException(message, e);
+ } finally {
+ JdbcUtil.safeClose(rs);
+ JdbcUtil.safeClose(ps);
+ connectionFactory.releaseConnection(conn);
+ }
+ }
+
+ public abstract void loadAllProcess(ResultSet rs, Set<InternalCacheEntry> result) throws SQLException, CacheLoaderException;
+
+ public abstract void toStreamProcess(ResultSet rs, InputStream is, ObjectOutput objectOutput) throws CacheLoaderException, SQLException, IOException;
+
+ public abstract boolean fromStreamProcess(Object objFromStream, PreparedStatement ps, ObjectInput objectInput) throws SQLException, CacheLoaderException, IOException, ClassNotFoundException;
+
+ public static void logAndThrow(Exception e, String message) throws CacheLoaderException {
+ log.error(message, e);
+ throw new CacheLoaderException(message, e);
+ }
+}
Property changes on: trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/DataManiulationHelper.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Modified: trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/binary/JdbcBinaryCacheStore.java
===================================================================
--- trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/binary/JdbcBinaryCacheStore.java 2009-07-24 09:13:27 UTC (rev 617)
+++ trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/binary/JdbcBinaryCacheStore.java 2009-07-24 12:54:10 UTC (rev 618)
@@ -9,6 +9,7 @@
import org.infinispan.loaders.bucket.BucketBasedCacheStore;
import org.infinispan.loaders.jdbc.JdbcUtil;
import org.infinispan.loaders.jdbc.TableManipulation;
+import org.infinispan.loaders.jdbc.DataManiulationHelper;
import org.infinispan.loaders.jdbc.connectionfactory.ConnectionFactory;
import org.infinispan.marshall.Marshaller;
import org.infinispan.util.logging.Log;
@@ -51,6 +52,7 @@
private JdbcBinaryCacheStoreConfig config;
private ConnectionFactory connectionFactory;
private TableManipulation tableManipulation;
+ private DataManiulationHelper dmHelper;
public void init(CacheLoaderConfig config, Cache cache, Marshaller m) {
if (log.isTraceEnabled())
@@ -67,6 +69,36 @@
factory.start(config.getConnectionFactoryConfig());
doConnectionFactoryInitialization(factory);
}
+ dmHelper = new DataManiulationHelper(connectionFactory, tableManipulation, marshaller) {
+ @Override
+ public void loadAllProcess(ResultSet rs, Set<InternalCacheEntry> result) throws SQLException, CacheLoaderException {
+ InputStream binaryStream = rs.getBinaryStream(1);
+ Bucket bucket = (Bucket) JdbcUtil.unmarshall(getMarshaller(), binaryStream);
+ result.addAll(bucket.getStoredEntries());
+ }
+
+ @Override
+ public void toStreamProcess(ResultSet rs, InputStream is, ObjectOutput objectOutput) throws CacheLoaderException, SQLException, IOException {
+ InputStream inputStream = rs.getBinaryStream(1);
+ Bucket bucket = (Bucket) JdbcUtil.unmarshall(getMarshaller(), inputStream);
+ String bucketName = rs.getString(2);
+ marshaller.objectToObjectStream(bucketName, objectOutput);
+ marshaller.objectToObjectStream(bucket, objectOutput);
+ }
+
+ public boolean fromStreamProcess(Object bucketName, PreparedStatement ps, ObjectInput objectInput) throws SQLException, CacheLoaderException, IOException, ClassNotFoundException {
+ if (bucketName instanceof String) {
+ Bucket bucket = (Bucket) marshaller.objectFromObjectStream(objectInput);
+ ByteBuffer buffer = JdbcUtil.marshall(getMarshaller(), bucket);
+ ps.setBinaryStream(1, buffer.getStream(), buffer.getLength());
+ ps.setLong(2, bucket.timestampOfFirstEntryToExpire());
+ ps.setString(3, (String) bucketName);
+ return true;
+ } else {
+ return false;
+ }
+ }
+ };
}
public void stop() throws CacheLoaderException {
@@ -95,7 +127,7 @@
throw new CacheLoaderException("Unexpected insert result: '" + insertedRows + "'. Expected values is 1");
}
} catch (SQLException ex) {
- logAndThrow(ex, "sql failure while inserting bucket: " + bucket);
+ DataManiulationHelper.logAndThrow(ex, "sql failure while inserting bucket: " + bucket);
} finally {
JdbcUtil.safeClose(ps);
connectionFactory.releaseConnection(conn);
@@ -121,7 +153,7 @@
throw new CacheLoaderException("Unexpected update result: '" + updatedRows + "'. Expected values is 1");
}
} catch (SQLException e) {
- logAndThrow(e, "sql failure while updating bucket: " + bucket);
+ DataManiulationHelper.logAndThrow(e, "sql failure while updating bucket: " + bucket);
} finally {
JdbcUtil.safeClose(ps);
connectionFactory.releaseConnection(conn);
@@ -159,127 +191,23 @@
}
public Set<InternalCacheEntry> loadAllLockSafe() throws CacheLoaderException {
- Connection conn = null;
- PreparedStatement ps = null;
- ResultSet rs = null;
- try {
- String sql = tableManipulation.getLoadAllRowsSql();
- if (log.isTraceEnabled()) {
- log.trace("Running loadAll. Sql: '" + sql + "'");
- }
- conn = connectionFactory.getConnection();
- ps = conn.prepareStatement(sql);
- rs = ps.executeQuery();
- rs.setFetchSize(config.getFetchSize());
- Set<InternalCacheEntry> result = new HashSet<InternalCacheEntry>();
- while (rs.next()) {
- InputStream binaryStream = rs.getBinaryStream(1);
- Bucket bucket = (Bucket) JdbcUtil.unmarshall(getMarshaller(), binaryStream);
- result.addAll(bucket.getStoredEntries());
- }
- return result;
- } catch (SQLException e) {
- String message = "sql failure while loading key: ";
- log.error(message, e);
- throw new CacheLoaderException(message, e);
- } finally {
- JdbcUtil.safeClose(rs);
- JdbcUtil.safeClose(ps);
- connectionFactory.releaseConnection(conn);
- }
+ return dmHelper.loadAllSupport();
}
protected void fromStreamLockSafe(ObjectInput objectInput) throws CacheLoaderException {
- Connection conn = null;
- PreparedStatement ps = null;
- try {
- conn = connectionFactory.getConnection();
- String sql = tableManipulation.getInsertRowSql();
- ps = conn.prepareStatement(sql);
-
- int readBuckets = 0;
- int batchSize = config.getBatchSize();
- Object bucketName = marshaller.objectFromObjectStream(objectInput);
- while (bucketName instanceof String) {
- Bucket bucket = (Bucket) marshaller.objectFromObjectStream(objectInput);
- readBuckets++;
- ByteBuffer buffer = JdbcUtil.marshall(getMarshaller(), bucket);
- ps.setBinaryStream(1, buffer.getStream(), buffer.getLength());
- ps.setLong(2, bucket.timestampOfFirstEntryToExpire());
- ps.setString(3, (String) bucketName);
- if (readBuckets % batchSize == 0) {
- ps.executeBatch();
- if (log.isTraceEnabled())
- log.trace("Executing batch " + (readBuckets / batchSize) + ", batch size is " + batchSize);
- } else {
- ps.addBatch();
- }
- bucketName = marshaller.objectFromObjectStream(objectInput);
- }
- if (readBuckets % batchSize != 0)
- ps.executeBatch();//flush the batch
- if (log.isTraceEnabled())
- log.trace("Successfully inserted " + readBuckets + " buckets into the database, batch size is " + batchSize);
- } catch (IOException ex) {
- logAndThrow(ex, "I/O failure while integrating state into store");
- } catch (SQLException e) {
- logAndThrow(e, "SQL failure while integrating state into store");
- } catch (ClassNotFoundException e) {
- logAndThrow(e, "Unexpected failure while integrating state into store");
- } finally {
- JdbcUtil.safeClose(ps);
- connectionFactory.releaseConnection(conn);
- }
+ dmHelper.fromStreamSupport(objectInput);
}
protected void toStreamLockSafe(ObjectOutput objectOutput) throws CacheLoaderException {
- Connection conn = null;
- PreparedStatement ps = null;
- ResultSet rs = null;
- try {
- conn = connectionFactory.getConnection();
- String sql = tableManipulation.getLoadAllRowsSql();
- ps = conn.prepareStatement(sql);
- rs = ps.executeQuery();
- rs.setFetchSize(config.getFetchSize());
- while (rs.next()) {
- InputStream inputStream = rs.getBinaryStream(1);
- Bucket bucket = (Bucket) JdbcUtil.unmarshall(getMarshaller(), inputStream);
- String bucketName = rs.getString(2);
- marshaller.objectToObjectStream(bucketName, objectOutput);
- marshaller.objectToObjectStream(bucket, objectOutput);
- }
- marshaller.objectToObjectStream(BINARY_STREAM_DELIMITER, objectOutput);
- } catch (SQLException ex) {
- logAndThrow(ex, "SQL failure while writing store's content to stream");
- }
- catch (IOException e) {
- logAndThrow(e, "IO failure while writing store's content to stream");
- } finally {
- JdbcUtil.safeClose(rs);
- JdbcUtil.safeClose(ps);
- connectionFactory.releaseConnection(conn);
- }
+ dmHelper.toStreamSupport(objectOutput, BINARY_STREAM_DELIMITER);
}
+ @Override
protected void clearLockSafe() throws CacheLoaderException {
- Connection conn = null;
- PreparedStatement ps = null;
- try {
- String sql = tableManipulation.getDeleteAllRowsSql();
- conn = connectionFactory.getConnection();
- ps = conn.prepareStatement(sql);
- int result = ps.executeUpdate();
- if (log.isTraceEnabled())
- log.trace("Successfully removed " + result + " rows.");
- } catch (SQLException ex) {
- logAndThrow(ex, "Failed clearing JdbcBinaryCacheStore");
- } finally {
- JdbcUtil.safeClose(ps);
- connectionFactory.releaseConnection(conn);
- }
+ dmHelper.clear();
}
+ @Override
public void purgeInternal() throws CacheLoaderException {
Connection conn = null;
PreparedStatement ps = null;
@@ -309,7 +237,7 @@
//if something happens make sure buckets locks are being release
releaseLocks(expiredBuckets);
connectionFactory.releaseConnection(conn);
- logAndThrow(ex, "Failed clearing JdbcBinaryCacheStore");
+ DataManiulationHelper.logAndThrow(ex, "Failed clearing JdbcBinaryCacheStore");
} finally {
JdbcUtil.safeClose(ps);
JdbcUtil.safeClose(rs);
@@ -353,7 +281,7 @@
//if something happens make sure buckets locks are being release
releaseLocks(emptyBuckets);
connectionFactory.releaseConnection(conn);
- logAndThrow(ex, "Failed clearing JdbcBinaryCacheStore");
+ DataManiulationHelper.logAndThrow(ex, "Failed clearing JdbcBinaryCacheStore");
} finally {
//release locks for the updated buckets.This won't include empty buckets, as these were migrated to emptyBuckets
releaseLocks(expiredBuckets);
@@ -386,7 +314,7 @@
}
} catch (SQLException ex) {
//if something happens make sure buckets locks are being release
- logAndThrow(ex, "Failed clearing JdbcBinaryCacheStore");
+ DataManiulationHelper.logAndThrow(ex, "Failed clearing JdbcBinaryCacheStore");
} finally {
releaseLocks(emptyBuckets);
JdbcUtil.safeClose(ps);
@@ -404,11 +332,6 @@
return JdbcBinaryCacheStoreConfig.class;
}
- protected void logAndThrow(Exception e, String message) throws CacheLoaderException {
- log.error(message, e);
- throw new CacheLoaderException(message, e);
- }
-
public ConnectionFactory getConnectionFactory() {
return connectionFactory;
}
Modified: trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/mixed/JdbcMixedCacheStore.java
===================================================================
--- trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/mixed/JdbcMixedCacheStore.java 2009-07-24 09:13:27 UTC (rev 617)
+++ trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/mixed/JdbcMixedCacheStore.java 2009-07-24 12:54:10 UTC (rev 618)
@@ -62,10 +62,10 @@
ConnectionFactoryConfig factoryConfig = config.getConnectionFactoryConfig();
sharedConnectionFactory = ConnectionFactory.getConnectionFactory(factoryConfig.getConnectionFactoryClass());
sharedConnectionFactory.start(factoryConfig);
+ binaryCacheStore.doConnectionFactoryInitialization(sharedConnectionFactory);
binaryCacheStore.start();
- binaryCacheStore.doConnectionFactoryInitialization(sharedConnectionFactory);
+ stringBasedCacheStore.doConnectionFactoryInitialization(sharedConnectionFactory);
stringBasedCacheStore.start();
- stringBasedCacheStore.doConnectionFactoryInitialization(sharedConnectionFactory);
}
@Override
Modified: trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/stringbased/JdbcStringBasedCacheStore.java
===================================================================
--- trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/stringbased/JdbcStringBasedCacheStore.java 2009-07-24 09:13:27 UTC (rev 617)
+++ trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/stringbased/JdbcStringBasedCacheStore.java 2009-07-24 12:54:10 UTC (rev 618)
@@ -7,6 +7,7 @@
import org.infinispan.loaders.CacheLoaderConfig;
import org.infinispan.loaders.CacheLoaderException;
import org.infinispan.loaders.LockSupportCacheStore;
+import org.infinispan.loaders.jdbc.DataManiulationHelper;
import org.infinispan.loaders.jdbc.JdbcUtil;
import org.infinispan.loaders.jdbc.TableManipulation;
import org.infinispan.loaders.jdbc.connectionfactory.ConnectionFactory;
@@ -22,7 +23,6 @@
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
-import java.util.HashSet;
import java.util.Set;
/**
@@ -53,6 +53,7 @@
private Key2StringMapper key2StringMapper;
private ConnectionFactory connectionFactory;
private TableManipulation tableManipulation;
+ private DataManiulationHelper dmHelper;
public void init(CacheLoaderConfig config, Cache cache, Marshaller m) {
super.init(config, cache, m);
@@ -69,8 +70,40 @@
doConnectionFactoryInitialization(connectionFactory);
}
this.key2StringMapper = config.getKey2StringMapper();
+ dmHelper = new DataManiulationHelper(connectionFactory, tableManipulation, marshaller) {
+
+ @Override
+ public void loadAllProcess(ResultSet rs, Set<InternalCacheEntry> result) throws SQLException, CacheLoaderException {
+ InputStream inputStream = rs.getBinaryStream(1);
+ InternalCacheValue icv = (InternalCacheValue) JdbcUtil.unmarshall(getMarshaller(), inputStream);
+ Object key = rs.getObject(2);
+ result.add(icv.toInternalCacheEntry(key));
+ }
+
+ @Override
+ public void toStreamProcess(ResultSet rs, InputStream is, ObjectOutput objectOutput) throws CacheLoaderException, SQLException, IOException {
+ InternalCacheValue icv = (InternalCacheValue) JdbcUtil.unmarshall(getMarshaller(), is);
+ Object key = rs.getObject(2);
+ marshaller.objectToObjectStream(icv.toInternalCacheEntry(key), objectOutput);
+ }
+
+ public boolean fromStreamProcess(Object objFromStream, PreparedStatement ps, ObjectInput objectInput) throws SQLException, CacheLoaderException {
+ if (objFromStream instanceof InternalCacheEntry) {
+ InternalCacheEntry se = (InternalCacheEntry) objFromStream;
+ String key = key2StringMapper.getStringMapping(se.getKey());
+ ByteBuffer buffer = JdbcUtil.marshall(getMarshaller(), se.toInternalCacheValue());
+ ps.setBinaryStream(1, buffer.getStream(), buffer.getLength());
+ ps.setLong(2, se.getExpiryTime());
+ ps.setString(3, key);
+ return true;
+ } else {
+ return false;
+ }
+ }
+ };
}
+ @Override
public void stop() throws CacheLoaderException {
tableManipulation.stop();
if (config.isManageConnectionFactory()) {
@@ -78,6 +111,7 @@
}
}
+ @Override
protected String getLockFromKey(Object key) throws CacheLoaderException {
if (!key2StringMapper.isSupportedType(key.getClass())) {
throw new UnsupportedKeyTypeException(key);
@@ -85,6 +119,7 @@
return key2StringMapper.getStringMapping(key);
}
+ @Override
public void storeLockSafe(InternalCacheEntry ed, String lockingKey) throws CacheLoaderException {
InternalCacheEntry existingOne = loadLockSafe(ed, lockingKey);
String sql;
@@ -113,6 +148,7 @@
}
}
+ @Override
public boolean removeLockSafe(Object key, String keyStr) throws CacheLoaderException {
Connection connection = null;
PreparedStatement ps = null;
@@ -133,100 +169,27 @@
}
}
+ @Override
public void fromStreamLockSafe(ObjectInput objectInput) throws CacheLoaderException {
- Connection conn = null;
- PreparedStatement ps = null;
- try {
- conn = connectionFactory.getConnection();
- String sql = tableManipulation.getInsertRowSql();
- ps = conn.prepareStatement(sql);
-
- int readStoredEntries = 0;
- int batchSize = config.getBatchSize();
- Object objFromStream = marshaller.objectFromObjectStream(objectInput);
- while (objFromStream instanceof InternalCacheEntry) {
- InternalCacheEntry se = (InternalCacheEntry) objFromStream;
- readStoredEntries++;
- String key = key2StringMapper.getStringMapping(se.getKey());
- ByteBuffer buffer = JdbcUtil.marshall(getMarshaller(), se.toInternalCacheValue());
- ps.setBinaryStream(1, buffer.getStream(), buffer.getLength());
- ps.setLong(2, se.getExpiryTime());
- ps.setString(3, key);
- ps.addBatch();
- if (readStoredEntries % batchSize == 0) {
- ps.executeBatch();
- if (log.isTraceEnabled())
- log.trace("Executing batch " + (readStoredEntries / batchSize) + ", batch size is " + batchSize);
- }
- objFromStream = marshaller.objectFromObjectStream(objectInput);
- }
- if (readStoredEntries % batchSize != 0)
- ps.executeBatch();//flush the batch
- if (log.isTraceEnabled())
- log.trace("Successfully inserted " + readStoredEntries + " buckets into the database, batch size is " + batchSize);
- } catch (IOException ex) {
- logAndThrow(ex, "I/O failure while integrating state into store");
- } catch (SQLException e) {
- logAndThrow(e, "SQL failure while integrating state into store");
- } catch (ClassNotFoundException e) {
- logAndThrow(e, "Unexpected failure while integrating state into store");
- } finally {
- JdbcUtil.safeClose(ps);
- connectionFactory.releaseConnection(conn);
- }
+ dmHelper.fromStreamSupport(objectInput);
}
+ @Override
protected void toStreamLockSafe(ObjectOutput objectOutput) throws CacheLoaderException {
- //now write our data
- Connection connection = null;
- PreparedStatement ps = null;
- ResultSet rs = null;
- try {
- String sql = tableManipulation.getLoadAllRowsSql();
- if (log.isTraceEnabled()) log.trace("Running sql '" + sql);
- connection = connectionFactory.getConnection();
- ps = connection.prepareStatement(sql);
- rs = ps.executeQuery();
- rs.setFetchSize(config.getFetchSize());
- while (rs.next()) {
- InputStream is = rs.getBinaryStream(1);
- InternalCacheValue icv = (InternalCacheValue) JdbcUtil.unmarshall(getMarshaller(), is);
- Object key = rs.getObject(2);
- marshaller.objectToObjectStream(icv.toInternalCacheEntry(key), objectOutput);
- }
- marshaller.objectToObjectStream(STRING_STREAM_DELIMITER, objectOutput);
- } catch (SQLException e) {
- logAndThrow(e, "SQL Error while storing string keys to database");
- } catch (IOException e) {
- logAndThrow(e, "I/O Error while storing string keys to database");
- }
- finally {
- JdbcUtil.safeClose(rs);
- JdbcUtil.safeClose(ps);
- connectionFactory.releaseConnection(connection);
- }
+ dmHelper.toStreamSupport(objectOutput, STRING_STREAM_DELIMITER);
}
@Override
protected void clearLockSafe() throws CacheLoaderException {
- Connection conn = null;
- PreparedStatement ps = null;
- try {
- String sql = tableManipulation.getDeleteAllRowsSql();
- conn = connectionFactory.getConnection();
- ps = conn.prepareStatement(sql);
- int result = ps.executeUpdate();
- if (log.isTraceEnabled())
- log.trace("Successfully removed " + result + " rows.");
- } catch (SQLException ex) {
- logAndThrow(ex, "Failed clearing JdbcBinaryCacheStore");
- } finally {
- JdbcUtil.safeClose(ps);
- connectionFactory.releaseConnection(conn);
- }
+ dmHelper.clear();
}
@Override
+ protected Set<InternalCacheEntry> loadAllLockSafe() throws CacheLoaderException {
+ return dmHelper.loadAllSupport();
+ }
+
+ @Override
public void purgeInternal() throws CacheLoaderException {
Connection conn = null;
PreparedStatement ps = null;
@@ -246,35 +209,8 @@
}
}
- protected Set<InternalCacheEntry> loadAllLockSafe() throws CacheLoaderException {
- Connection conn = null;
- PreparedStatement ps = null;
- ResultSet rs = null;
- try {
- String sql = tableManipulation.getLoadAllRowsSql();
- conn = connectionFactory.getConnection();
- ps = conn.prepareStatement(sql);
- rs = ps.executeQuery();
- rs.setFetchSize(config.getFetchSize());
- Set<InternalCacheEntry> result = new HashSet<InternalCacheEntry>();
- while (rs.next()) {
- InputStream inputStream = rs.getBinaryStream(1);
- InternalCacheValue icv = (InternalCacheValue) JdbcUtil.unmarshall(getMarshaller(), inputStream);
- Object key = rs.getObject(2);
- result.add(icv.toInternalCacheEntry(key));
- }
- return result;
- } catch (SQLException e) {
- String message = "SQL error while fetching all StoredEntries";
- log.error(message, e);
- throw new CacheLoaderException(message, e);
- } finally {
- JdbcUtil.safeClose(rs);
- JdbcUtil.safeClose(ps);
- connectionFactory.releaseConnection(conn);
- }
- }
+ @Override
protected InternalCacheEntry loadLockSafe(Object key, String lockingKey) throws CacheLoaderException {
Connection conn = null;
PreparedStatement ps = null;
More information about the infinispan-commits
mailing list