Author: manik.surtani(a)jboss.com
Date: 2009-03-01 06:33:40 -0500 (Sun, 01 Mar 2009)
New Revision: 7809
Modified:
core/branches/flat/src/main/java/org/horizon/loader/CacheStore.java
core/branches/flat/src/main/java/org/horizon/loader/bucket/BucketBasedCacheStore.java
core/branches/flat/src/main/java/org/horizon/loader/decorators/AbstractDelegatingStore.java
core/branches/flat/src/main/java/org/horizon/loader/decorators/ChainingCacheStore.java
core/branches/flat/src/main/java/org/horizon/loader/decorators/ReadOnlyStore.java
core/branches/flat/src/main/java/org/horizon/loader/decorators/SingletonStore.java
core/branches/flat/src/test/java/org/horizon/loader/BaseCacheStoreTest.java
core/branches/flat/src/test/java/org/horizon/loader/decorators/ReadOnlyCacheStoreTest.java
core/branches/flat/src/test/java/org/horizon/loader/dummy/DummyInMemoryCacheStore.java
core/branches/flat/src/test/java/org/horizon/loader/file/FileCacheStoreTest.java
Log:
Changed CacheStore stream API, added javadocs
Modified: core/branches/flat/src/main/java/org/horizon/loader/CacheStore.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/CacheStore.java 2009-02-28
00:39:42 UTC (rev 7808)
+++ core/branches/flat/src/main/java/org/horizon/loader/CacheStore.java 2009-03-01
11:33:40 UTC (rev 7809)
@@ -3,8 +3,8 @@
import org.horizon.loader.modifications.Modification;
import javax.transaction.Transaction;
-import java.io.InputStream;
-import java.io.OutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.util.List;
import java.util.Set;
@@ -26,40 +26,42 @@
/**
* Writes contents of the stream to the store. Implementations should expect that the
stream contains data in an
- * implementation-specific format, typically generated using {@link
#toStream(java.io.OutputStream)}. While not a
- * requirement, it is recommended that implementations make use of the {@link
org.horizon.marshall.Marshaller} when
- * dealing with the stream to make use of efficient marshalling.
- * <p />
+ * implementation-specific format, typically generated using {@link
#toStream(java.io.ObjectOutputStream)}. While
+ * not a requirement, it is recommended that implementations make use of the {@link
org.horizon.marshall.Marshaller}
+ * when dealing with the stream to make use of efficient marshalling.
+ * <p/>
* It is imperative that implementations <b><i>do not</i></b>
close the stream after finishing with it.
- * <p />
- * It is also <b><i>recommended</b></i> that implementations
use their own start and end markers on the stream
- * since other processes may write additional data to the stream after the cache store
has written to it. As such,
- * either markers or some other mechanism to prevent the store from reading too much
information should be employed
- * when writing to the stream in {@link #fromStream(java.io.InputStream)} to prevent
data corruption.
- * <p />
+ * <p/>
+ * It is also <b><i>recommended</b></i> that implementations
use their own start and end markers on the stream since
+ * other processes may write additional data to the stream after the cache store has
written to it. As such, either
+ * markers or some other mechanism to prevent the store from reading too much
information should be employed when
+ * writing to the stream in {@link #fromStream(java.io.ObjectInputStream)} to prevent
data corruption.
+ * <p/>
+ *
* @param inputStream stream to read from
* @throws CacheLoaderException in the event of problems writing to the store
*/
- void fromStream(InputStream inputStream) throws CacheLoaderException;
+ void fromStream(ObjectInputStream inputStream) throws CacheLoaderException;
/**
* Loads the entire state into a stream, using whichever format is most efficient for
the cache loader
- * implementation. Typically read and parsed by {@link
#fromStream(java.io.InputStream)}.
+ * implementation. Typically read and parsed by {@link
#fromStream(java.io.ObjectInputStream)}.
* <p/>
* While not a requirement, it is recommended that implementations make use of the
{@link
* org.horizon.marshall.Marshaller} when dealing with the stream to make use of
efficient marshalling.
- * <p />
- * It is imperative that implementations <b><i>do not</i></b>
close the stream after finishing with it.
- * <p />
- * It is also <b><i>recommended</b></i> that implementations
use their own start and end markers on the stream
- * since other processes may write additional data to the stream after the cache store
has written to it. As such,
- * either markers or some other mechanism to prevent the store from reading too much
information in {@link #fromStream(java.io.InputStream)}
- * should be employed, to prevent data corruption.
- * <p />
+ * <p/>
+ * It is imperative that implementations <b><i>do not</i></b>
flush or close the stream after finishing with it.
+ * <p/>
+ * It is also <b><i>recommended</b></i> that implementations
use their own start and end markers on the stream since
+ * other processes may write additional data to the stream after the cache store has
written to it. As such, either
+ * markers or some other mechanism to prevent the store from reading too much
information in {@link
+ * #fromStream(java.io.ObjectInputStream)} should be employed, to prevent data
corruption.
+ * <p/>
+ *
* @param outputStream stream to write to
* @throws CacheLoaderException in the event of problems reading from the store
*/
- void toStream(OutputStream outputStream) throws CacheLoaderException;
+ void toStream(ObjectOutputStream outputStream) throws CacheLoaderException;
/**
* Clears all entries in the store
@@ -104,7 +106,17 @@
void prepare(List<? extends Modification> modifications, Transaction tx, boolean
isOnePhase) throws CacheLoaderException;
/**
- * Commits a transaction that has been previously prepared
+ * Commits a transaction that has been previously prepared.
+ * <p/>
+ * This method <i>may</b> be invoked on a transaction for which there is
<i>no</i> prior {@link
+ * #prepare(java.util.List, javax.transaction.Transaction, boolean)}. The
implementation would need to deal with
+ * this case acordingly. Typically, this would be a no-op, after ensuring any
resources attached to the transaction
+ * are cleared up.
+ * <p/>
+ * Also note that this method <i>may</i> be invoked on a thread which is
different from the {@link
+ * #prepare(java.util.List, javax.transaction.Transaction, boolean)} invocation. As
such, {@link ThreadLocal}s
+ * should not be relied upon to maintain transaction context.
+ * <p/>
*
* @param tx tx to commit
* @throws CacheLoaderException in the event of problems writing to the store
@@ -113,6 +125,16 @@
/**
* Rolls back a transaction that has been previously prepared
+ * <p/>
+ * This method <i>may</b> be invoked on a transaction for which there is
<i>no</i> prior {@link
+ * #prepare(java.util.List, javax.transaction.Transaction, boolean)}. The
implementation would need to deal with
+ * this case acordingly. Typically, this would be a no-op, after ensuring any
resources attached to the transaction
+ * are cleared up.
+ * <p/>
+ * Also note that this method <i>may</i> be invoked on a thread which is
different from the {@link
+ * #prepare(java.util.List, javax.transaction.Transaction, boolean)} invocation. As
such, {@link ThreadLocal}s
+ * should not be relied upon to maintain transaction context.
+ * <p/>
*
* @param tx tx to roll back
*/
Modified:
core/branches/flat/src/main/java/org/horizon/loader/bucket/BucketBasedCacheStore.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/bucket/BucketBasedCacheStore.java 2009-02-28
00:39:42 UTC (rev 7808)
+++
core/branches/flat/src/main/java/org/horizon/loader/bucket/BucketBasedCacheStore.java 2009-03-01
11:33:40 UTC (rev 7809)
@@ -3,24 +3,21 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.horizon.Cache;
-import org.horizon.util.concurrent.WithinThreadExecutor;
import org.horizon.loader.AbstractCacheStore;
import org.horizon.loader.CacheLoaderConfig;
import org.horizon.loader.CacheLoaderException;
import org.horizon.loader.StoredEntry;
import org.horizon.lock.StripedLock;
import org.horizon.marshall.Marshaller;
+import org.horizon.util.concurrent.WithinThreadExecutor;
-import java.util.concurrent.TimeUnit;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.ObjectOutputStream;
/**
* //TODO comment this
@@ -129,40 +126,23 @@
}
}
- public void fromStream(InputStream inputStream) throws CacheLoaderException {
- ObjectInputStream ois = null;
+ public void fromStream(ObjectInputStream inputStream) throws CacheLoaderException {
try {
// first clear all local state
acquireGlobalLock(true);
clear();
- ois = (inputStream instanceof ObjectInputStream) ? (ObjectInputStream)
inputStream :
- new ObjectInputStream(inputStream);
- fromStreamInternal(ois);
- }
- catch (IOException e) {
- throw new CacheLoaderException("Cannot convert to ObjectInputSream",
e);
+ fromStreamInternal(inputStream);
} finally {
releaseGlobalLock(true);
- // we should close the stream we created!
- if (inputStream != ois) safeClose(ois);
}
}
- public void toStream(OutputStream outputStream) throws CacheLoaderException {
- ObjectOutputStream oos = null;
+ public void toStream(ObjectOutputStream outputStream) throws CacheLoaderException {
try {
acquireGlobalLock(true);
- try {
- oos = (outputStream instanceof ObjectOutputStream) ? (ObjectOutputStream)
outputStream :
- new ObjectOutputStream(outputStream);
- } catch (IOException e) {
- throw new CacheLoaderException(e);
- }
- toStreamInternal(oos);
+ toStreamInternal(outputStream);
} finally {
releaseGlobalLock(true);
- // we should close the stream we created!
- if (oos != outputStream) safeClose(oos);
}
}
Modified:
core/branches/flat/src/main/java/org/horizon/loader/decorators/AbstractDelegatingStore.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/decorators/AbstractDelegatingStore.java 2009-02-28
00:39:42 UTC (rev 7808)
+++
core/branches/flat/src/main/java/org/horizon/loader/decorators/AbstractDelegatingStore.java 2009-03-01
11:33:40 UTC (rev 7809)
@@ -10,8 +10,8 @@
import org.horizon.marshall.Marshaller;
import javax.transaction.Transaction;
-import java.io.InputStream;
-import java.io.OutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.util.List;
import java.util.Set;
@@ -42,11 +42,11 @@
delegate.store(ed);
}
- public void fromStream(InputStream inputStream) throws CacheLoaderException {
+ public void fromStream(ObjectInputStream inputStream) throws CacheLoaderException {
delegate.fromStream(inputStream);
}
- public void toStream(OutputStream outputStream) throws CacheLoaderException {
+ public void toStream(ObjectOutputStream outputStream) throws CacheLoaderException {
delegate.toStream(outputStream);
}
Modified:
core/branches/flat/src/main/java/org/horizon/loader/decorators/ChainingCacheStore.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/decorators/ChainingCacheStore.java 2009-02-28
00:39:42 UTC (rev 7808)
+++
core/branches/flat/src/main/java/org/horizon/loader/decorators/ChainingCacheStore.java 2009-03-01
11:33:40 UTC (rev 7809)
@@ -10,8 +10,8 @@
import org.horizon.marshall.Marshaller;
import javax.transaction.Transaction;
-import java.io.InputStream;
-import java.io.OutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
@@ -39,7 +39,7 @@
for (CacheStore s : stores.keySet()) s.store(ed);
}
- public void fromStream(InputStream inputStream) throws CacheLoaderException {
+ public void fromStream(ObjectInputStream inputStream) throws CacheLoaderException {
// loading and storing state via streams is *only* supported on the *first* store
that has fetchPersistentState set.
for (Map.Entry<CacheStore, CacheLoaderConfig> e : stores.entrySet()) {
if (e.getValue().isFetchPersistentState()) {
@@ -50,7 +50,7 @@
}
}
- public void toStream(OutputStream outputStream) throws CacheLoaderException {
+ public void toStream(ObjectOutputStream outputStream) throws CacheLoaderException {
// loading and storing state via streams is *only* supported on the *first* store
that has fetchPersistentState set.
for (Map.Entry<CacheStore, CacheLoaderConfig> e : stores.entrySet()) {
if (e.getValue().isFetchPersistentState()) {
Modified:
core/branches/flat/src/main/java/org/horizon/loader/decorators/ReadOnlyStore.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/decorators/ReadOnlyStore.java 2009-02-28
00:39:42 UTC (rev 7808)
+++
core/branches/flat/src/main/java/org/horizon/loader/decorators/ReadOnlyStore.java 2009-03-01
11:33:40 UTC (rev 7809)
@@ -5,7 +5,7 @@
import org.horizon.loader.modifications.Modification;
import javax.transaction.Transaction;
-import java.io.InputStream;
+import java.io.ObjectInputStream;
import java.util.List;
/**
@@ -27,7 +27,7 @@
}
@Override
- public void fromStream(InputStream inputStream) {
+ public void fromStream(ObjectInputStream inputStream) {
// no-op
}
Modified:
core/branches/flat/src/main/java/org/horizon/loader/decorators/SingletonStore.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/decorators/SingletonStore.java 2009-02-28
00:39:42 UTC (rev 7808)
+++
core/branches/flat/src/main/java/org/horizon/loader/decorators/SingletonStore.java 2009-03-01
11:33:40 UTC (rev 7809)
@@ -17,7 +17,7 @@
import org.horizon.remoting.transport.Address;
import javax.transaction.Transaction;
-import java.io.InputStream;
+import java.io.ObjectInputStream;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
@@ -112,7 +112,7 @@
}
@Override
- public void fromStream(InputStream inputStream) throws CacheLoaderException {
+ public void fromStream(ObjectInputStream inputStream) throws CacheLoaderException {
if (active) super.fromStream(inputStream);
}
Modified: core/branches/flat/src/test/java/org/horizon/loader/BaseCacheStoreTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/BaseCacheStoreTest.java 2009-02-28
00:39:42 UTC (rev 7808)
+++ core/branches/flat/src/test/java/org/horizon/loader/BaseCacheStoreTest.java 2009-03-01
11:33:40 UTC (rev 7809)
@@ -17,6 +17,8 @@
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedList;
@@ -247,10 +249,14 @@
cs.store(new StoredEntry("k3", "v3", -1, -1));
ByteArrayOutputStream out = new ByteArrayOutputStream();
- cs.toStream(out);
+ ObjectOutputStream oos = new ObjectOutputStream(out);
+ cs.toStream(oos);
+ oos.flush();
+ oos.close();
out.close();
cs.clear();
- cs.fromStream(new ByteArrayInputStream(out.toByteArray()));
+ ObjectInputStream ois = new ObjectInputStream(new
ByteArrayInputStream(out.toByteArray()));
+ cs.fromStream(ois);
Set<StoredEntry> set = cs.loadAll();
@@ -269,10 +275,13 @@
cs.store(new StoredEntry("k3", "v3", -1, -1));
ByteArrayOutputStream out = new ByteArrayOutputStream();
- byte[] dummyStartBytes = {1,2,3,4,5,6,7,8};
- byte[] dummyEndBytes = {8,7,6,5,4,3,2,1};
+ byte[] dummyStartBytes = {1, 2, 3, 4, 5, 6, 7, 8};
+ byte[] dummyEndBytes = {8, 7, 6, 5, 4, 3, 2, 1};
out.write(dummyStartBytes);
- cs.toStream(out);
+ ObjectOutputStream oos = new ObjectOutputStream(out);
+ cs.toStream(oos);
+ oos.flush();
+ oos.close();
out.write(dummyEndBytes);
out.close();
cs.clear();
@@ -282,11 +291,11 @@
ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
int bytesRead = in.read(dummy, 0, 8);
assert bytesRead == 8;
- for (int i=1; i<9; i++) assert dummy[i - 1] == i : "Start byte stream
corrupted!";
- cs.fromStream(in);
+ for (int i = 1; i < 9; i++) assert dummy[i - 1] == i : "Start byte stream
corrupted!";
+ cs.fromStream(new ObjectInputStream(in));
bytesRead = in.read(dummy, 0, 8);
assert bytesRead == 8;
- for (int i=8; i>0; i--) assert dummy[8 - i] == i : "Start byte stream
corrupted!";
+ for (int i = 8; i > 0; i--) assert dummy[8 - i] == i : "Start byte stream
corrupted!";
Set<StoredEntry> set = cs.loadAll();
Modified:
core/branches/flat/src/test/java/org/horizon/loader/decorators/ReadOnlyCacheStoreTest.java
===================================================================
---
core/branches/flat/src/test/java/org/horizon/loader/decorators/ReadOnlyCacheStoreTest.java 2009-02-28
00:39:42 UTC (rev 7808)
+++
core/branches/flat/src/test/java/org/horizon/loader/decorators/ReadOnlyCacheStoreTest.java 2009-03-01
11:33:40 UTC (rev 7809)
@@ -6,8 +6,6 @@
import org.horizon.loader.StoredEntry;
import org.testng.annotations.Test;
-import java.io.InputStream;
-
@Test(groups = "unit", testName =
"loader.decorators.ReadOnlyCacheStoreTest")
public class ReadOnlyCacheStoreTest {
public void testWriteMethods() throws CacheLoaderException {
@@ -21,8 +19,8 @@
store.clear();
store.purgeExpired();
store.remove("key");
- store.store((StoredEntry) null);
- store.fromStream((InputStream) null);
+ store.store(null);
+ store.fromStream(null);
store.prepare(null, null, true);
store.commit(null);
store.rollback(null);
Modified:
core/branches/flat/src/test/java/org/horizon/loader/dummy/DummyInMemoryCacheStore.java
===================================================================
---
core/branches/flat/src/test/java/org/horizon/loader/dummy/DummyInMemoryCacheStore.java 2009-02-28
00:39:42 UTC (rev 7808)
+++
core/branches/flat/src/test/java/org/horizon/loader/dummy/DummyInMemoryCacheStore.java 2009-03-01
11:33:40 UTC (rev 7809)
@@ -11,10 +11,8 @@
import org.horizon.marshall.Marshaller;
import org.horizon.marshall.ObjectStreamMarshaller;
-import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
-import java.io.OutputStream;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
@@ -36,11 +34,8 @@
}
@SuppressWarnings("unchecked")
- public void fromStream(InputStream inputStream) throws CacheLoaderException {
+ public void fromStream(ObjectInputStream ois) throws CacheLoaderException {
try {
- ObjectInputStream ois = inputStream instanceof ObjectInputStream ?
(ObjectInputStream) inputStream :
- new ObjectInputStream(inputStream);
-
int numEntries = (Integer) marshaller.objectFromObjectStream(ois);
store.clear();
for (int i = 0; i < numEntries; i++) {
@@ -52,10 +47,8 @@
}
}
- public void toStream(OutputStream outputStream) throws CacheLoaderException {
+ public void toStream(ObjectOutputStream oos) throws CacheLoaderException {
try {
- ObjectOutputStream oos = outputStream instanceof ObjectOutputStream ?
(ObjectOutputStream) outputStream :
- new ObjectOutputStream(outputStream);
marshaller.objectToObjectStream(store.size(), oos);
for (StoredEntry se : store.values()) marshaller.objectToObjectStream(se, oos);
} catch (Exception e) {
Modified:
core/branches/flat/src/test/java/org/horizon/loader/file/FileCacheStoreTest.java
===================================================================
---
core/branches/flat/src/test/java/org/horizon/loader/file/FileCacheStoreTest.java 2009-02-28
00:39:42 UTC (rev 7808)
+++
core/branches/flat/src/test/java/org/horizon/loader/file/FileCacheStoreTest.java 2009-03-01
11:33:40 UTC (rev 7809)
@@ -15,6 +15,7 @@
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
@Test(groups = "unit", testName = "loader.file.FileCacheStoreTest")
public class FileCacheStoreTest extends BaseCacheStoreTest {
@@ -89,8 +90,10 @@
cs.store(new StoredEntry("k1", "v1", -1, -1));
ByteArrayOutputStream out = new ByteArrayOutputStream();
- cs.toStream(out);
- out.flush();
+ ObjectOutputStream oos = new ObjectOutputStream(out);
+ cs.toStream(oos);
+ oos.flush();
+ oos.close();
out.close();
ObjectInputStream ois = null;