JBoss Cache SVN: r5297 - core/trunk/src/main/resources.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2008-02-05 14:07:26 -0500 (Tue, 05 Feb 2008)
New Revision: 5297
Removed:
core/trunk/src/main/resources/jboss-beans.xml
Log:
removed unused file
Deleted: core/trunk/src/main/resources/jboss-beans.xml
===================================================================
--- core/trunk/src/main/resources/jboss-beans.xml 2008-02-05 18:59:35 UTC (rev 5296)
+++ core/trunk/src/main/resources/jboss-beans.xml 2008-02-05 19:07:26 UTC (rev 5297)
@@ -1,28 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-
-<deployment xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="urn:jboss:bean-deployer:2.0 bean-deployer_2_0.xsd"
- xmlns="urn:jboss:bean-deployer:2.0">
-
- <!-- Let's start with naming a few beans that we will always use regardless of configuration -->
- <bean name="cache" class="org.jboss.cache.CacheImpl">
- <!--<property name="notifier">-->
- <!--<inject bean="notifier" />-->
- <!--</property>-->
- </bean>
-
- <bean name="notifier" class="org.jboss.cache.notifications.Notifier">
- <constructor>
- <inject bean="cache" />
- </constructor>
- </bean>
-
- <bean name="rpcManager" class="org.jboss.cache.RPCManagerImpl">
- <property name="cache">
- <inject bean="cache" />
- </property>
- </bean>
-
-
-
-</deployment>
16 years, 10 months
JBoss Cache SVN: r5296 - core/trunk/src/main/java/org/jboss/cache/loader.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2008-02-05 13:59:35 -0500 (Tue, 05 Feb 2008)
New Revision: 5296
Modified:
core/trunk/src/main/java/org/jboss/cache/loader/FileCacheLoader.java
Log:
JBCACHE-1282 - reduce character limit warn message on newer windows systems
Modified: core/trunk/src/main/java/org/jboss/cache/loader/FileCacheLoader.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/loader/FileCacheLoader.java 2008-02-05 18:14:08 UTC (rev 5295)
+++ core/trunk/src/main/java/org/jboss/cache/loader/FileCacheLoader.java 2008-02-05 18:59:35 UTC (rev 5296)
@@ -78,7 +78,23 @@
* For fqn, check '*' '<' '>' '|' '"' '?' and also '\' '/' and ':'
*/
public static final Pattern FQN_PATTERN = Pattern.compile("[\\\\\\/:*<>|\"?]");
+ private static boolean isOldWindows;
+ public FileCacheLoader()
+ {
+ float osVersion = -1;
+ try
+ {
+ osVersion = Float.parseFloat(System.getProperty("os.version").trim());
+ }
+ catch (Exception e)
+ {
+ // ignore
+ }
+ // 4.x is windows NT/2000 and 5.x is XP.
+ isOldWindows = System.getProperty("os.name").toLowerCase().startsWith("windows") && osVersion < 4;
+ }
+
public void setConfig(IndividualCacheLoaderConfig base)
{
if (base instanceof FileCacheLoaderConfig)
@@ -473,9 +489,10 @@
protected boolean isLengthPortablePath(String absoluteFqnPath)
{
- if (absoluteFqnPath.length() > 255)
+
+ if (isOldWindows && absoluteFqnPath.length() > 255)
{
- log.warn("The full absolute path to the fqn that you are trying to store is bigger than 255 characters, this could lead to problems in Windows systems: " + absoluteFqnPath);
+ log.warn("The full absolute path to the fqn that you are trying to store is bigger than 255 characters, this could lead to problems on certain Windows systems: " + absoluteFqnPath);
return false;
}
16 years, 10 months
JBoss Cache SVN: r5295 - core/trunk/src/main/java/org/jboss/cache/marshall/io.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2008-02-05 13:14:08 -0500 (Tue, 05 Feb 2008)
New Revision: 5295
Modified:
core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectInputStream.java
Log:
Added object stream pooling for input streams as well
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectInputStream.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectInputStream.java 2008-02-05 18:06:41 UTC (rev 5294)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectInputStream.java 2008-02-05 18:14:08 UTC (rev 5295)
@@ -17,14 +17,14 @@
*/
public class ReusableObjectInputStream extends MarshalledValueInputStream
{
- ReusableByteArrayInputStream bytes;
+ ResettableByteArrayInputStream bytes;
public ReusableObjectInputStream() throws IOException, SecurityException
{
- this(new ReusableByteArrayInputStream());
+ this(new ResettableByteArrayInputStream());
}
- protected ReusableObjectInputStream(ReusableByteArrayInputStream in) throws IOException
+ protected ReusableObjectInputStream(ResettableByteArrayInputStream in) throws IOException
{
super(in);
bytes = in;
@@ -56,7 +56,7 @@
/**
* A byte array input stream that can be reused (i.e., have it's byte array re-initialised).
*/
- static class ReusableByteArrayInputStream extends ByteArrayInputStream
+ static class ResettableByteArrayInputStream extends ByteArrayInputStream
{
private final static byte[] INIT_BYTES = {(byte) ((STREAM_MAGIC >>> 8) & 0xFF),
(byte) ((STREAM_MAGIC) & 0xFF),
@@ -67,7 +67,7 @@
/**
* Creates a new instance with a null byte buffer. Use {@link #init(byte[])} to set the byte buffer to use.
*/
- ReusableByteArrayInputStream()
+ ResettableByteArrayInputStream()
{
super(INIT_BYTES);
}
16 years, 10 months
JBoss Cache SVN: r5294 - in core/trunk/src: main/java/org/jboss/cache/marshall/io and 1 other directories.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2008-02-05 13:06:41 -0500 (Tue, 05 Feb 2008)
New Revision: 5294
Modified:
core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java
core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java
core/trunk/src/main/java/org/jboss/cache/marshall/io/ObjectStreamPool.java
core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectInputStream.java
core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectOutputStream.java
core/trunk/src/test/java/org/jboss/cache/marshall/io/ObjectStreamPoolTest.java
Log:
Added object stream pooling for input streams as well
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java 2008-02-05 15:45:44 UTC (rev 5293)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java 2008-02-05 18:06:41 UTC (rev 5294)
@@ -18,10 +18,7 @@
import org.jboss.cache.factories.annotations.Start;
import org.jboss.cache.transaction.GlobalTransaction;
-import java.io.ByteArrayOutputStream;
import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -75,30 +72,33 @@
// implement the basic contract set in RPcDispatcher.AbstractMarshaller
public byte[] objectToByteBuffer(Object obj) throws Exception
{
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ObjectOutputStream out = ObjectSerializationFactory.createObjectOutputStream(baos);
- objectToObjectStream(obj, out);
- out.close();
- return baos.toByteArray();
+ throw new RuntimeException("Needs to be overridden!");
+// ByteArrayOutputStream baos = new ByteArrayOutputStream();
+// ObjectOutputStream out = ObjectSerializationFactory.createObjectOutputStream(baos);
+// objectToObjectStream(obj, out);
+// out.close();
+// return baos.toByteArray();
}
public Object objectFromByteBuffer(byte[] bytes) throws Exception
{
- ObjectInputStream in = ObjectSerializationFactory.createObjectInputStream(bytes);
- return objectFromObjectStream(in);
+ throw new RuntimeException("Needs to be overridden!");
+ //ObjectInputStream in = ObjectSerializationFactory.createObjectInputStream(bytes);
+ //return objectFromObjectStream(in);
}
public Object objectFromStream(InputStream in) throws Exception
{
+ throw new RuntimeException("Needs to be overridden!");
// by default just create an OIS from this IS and pass in to the relevant method
- if (in instanceof ObjectInputStream)
- {
- return objectFromObjectStream((ObjectInputStream) in);
- }
- else
- {
- return objectFromObjectStream(ObjectSerializationFactory.createObjectInputStream(in));
- }
+// if (in instanceof ObjectInputStream)
+// {
+// return objectFromObjectStream((ObjectInputStream) in);
+// }
+// else
+// {
+// return objectFromObjectStream(ObjectSerializationFactory.createObjectInputStream(in));
+// }
}
/**
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java 2008-02-05 15:45:44 UTC (rev 5293)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java 2008-02-05 18:06:41 UTC (rev 5294)
@@ -160,11 +160,6 @@
public byte[] objectToByteBuffer(Object obj) throws Exception
{
-// ByteArrayOutputStream bos = new ByteArrayOutputStream();
-// ObjectOutputStream out;
-
- // based on the default marshaller, construct an object output stream based on what's compatible.
-// out = ObjectSerializationFactory.createObjectOutputStream(bos);
ReusableObjectOutputStream out = pool.getOutputStream();
try
{
@@ -173,16 +168,12 @@
//now marshall the contents of the object
defaultMarshaller.objectToObjectStream(obj, out);
-// out.flush();
// and return bytes.
return out.getBytes();
}
finally
{
- // return to queue
-// out.reset();
-// queue.put(out);
pool.returnStreamToPool(out);
}
}
@@ -191,44 +182,38 @@
{
Marshaller marshaller;
int versionId;
- ObjectInputStream in;
+ ObjectInputStream in = pool.getInputStream(buf);
+ ;
+
try
{
- in = ObjectSerializationFactory.createObjectInputStream(buf);
- versionId = in.readShort();
- if (trace) log.trace("Read version " + versionId);
+ try
+ {
+ versionId = in.readShort();
+ if (trace) log.trace("Read version " + versionId);
+ }
+ catch (Exception e)
+ {
+ log.error("Unable to read version id from first two bytes of stream, barfing.");
+ throw e;
+ }
+
+ marshaller = getMarshaller(versionId);
+
+ return marshaller.objectFromObjectStream(in);
}
- catch (Exception e)
+ finally
{
- log.error("Unable to read version id from first two bytes of stream, barfing.");
- throw e;
+ pool.returnStreamToPool(in);
}
-
- marshaller = getMarshaller(versionId);
-
- return marshaller.objectFromObjectStream(in);
}
public Object objectFromStream(InputStream is) throws Exception
{
- Marshaller marshaller;
- int versionId;
- ObjectInputStream in;
- try
- {
- in = ObjectSerializationFactory.createObjectInputStream(is);
- versionId = in.readShort();
- if (trace) log.trace("Read version " + versionId);
- }
- catch (Exception e)
- {
- log.error("Unable to read version id from first two bytes of stream, barfing.");
- throw e;
- }
-
- marshaller = getMarshaller(versionId);
-
- return marshaller.objectFromObjectStream(in);
+ int avbl = is.available();
+ byte[] bytes = new byte[avbl];
+ is.read(bytes, 0, avbl);
+ return objectFromByteBuffer(bytes);
}
public void objectToObjectStream(Object obj, ObjectOutputStream out, Fqn region) throws Exception
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/io/ObjectStreamPool.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/io/ObjectStreamPool.java 2008-02-05 15:45:44 UTC (rev 5293)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/io/ObjectStreamPool.java 2008-02-05 18:06:41 UTC (rev 5294)
@@ -1,8 +1,5 @@
package org.jboss.cache.marshall.io;
-import org.jboss.util.stream.MarshalledValueInputStream;
-
-import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.concurrent.BlockingQueue;
@@ -17,10 +14,8 @@
public class ObjectStreamPool
{
BlockingQueue<ReusableObjectOutputStream> outputStreams;
+ BlockingQueue<ReusableObjectInputStream> inputStreams;
- // TODO: Implement for input streams as well - much harder.
-// BlockingQueue<ReusableObjectInputStream> inputStreams;
-
/**
* Constructs the pool with a fixed number of object output and input streams
*
@@ -32,10 +27,10 @@
public ObjectStreamPool(int numOutputStreams, int numInputStreams) throws InterruptedException, IOException
{
outputStreams = new LinkedBlockingQueue<ReusableObjectOutputStream>();
-// inputStreams = new LinkedBlockingQueue<ReusableObjectInputStream>();
+ inputStreams = new LinkedBlockingQueue<ReusableObjectInputStream>();
for (int i = 0; i < numOutputStreams; i++) outputStreams.put(new ReusableObjectOutputStream());
-// for (int i=0; i<numInputStreams; i++) inputStreams.put(new ReusableObjectInputStream());
+ for (int i = 0; i < numInputStreams; i++) inputStreams.put(new ReusableObjectInputStream());
}
/**
@@ -44,15 +39,17 @@
*/
public ReusableObjectOutputStream getOutputStream() throws InterruptedException
{
- return outputStreams.take();
+ ReusableObjectOutputStream roos = outputStreams.take();
+ roos.init();
+ return roos;
}
public ObjectInputStream getInputStream(byte[] buf) throws IOException, InterruptedException
{
- return new MarshalledValueInputStream(new ByteArrayInputStream(buf));
-// ReusableObjectInputStream rois = inputStreams.take();
-// rois.init(buf);
-// return rois;
+// return new MarshalledValueInputStream(new ByteArrayInputStream(buf));
+ ReusableObjectInputStream rois = inputStreams.take();
+ rois.init(buf);
+ return rois;
}
public void returnStreamToPool(ReusableObjectOutputStream stream) throws IOException, InterruptedException
@@ -63,7 +60,7 @@
public void returnStreamToPool(ObjectInputStream stream) throws InterruptedException, IOException
{
-// stream.reset();
-// inputStreams.put((ReusableObjectInputStream) stream);
+ stream.reset();
+ inputStreams.put((ReusableObjectInputStream) stream);
}
}
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectInputStream.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectInputStream.java 2008-02-05 15:45:44 UTC (rev 5293)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectInputStream.java 2008-02-05 18:06:41 UTC (rev 5294)
@@ -3,9 +3,7 @@
import org.jboss.util.stream.MarshalledValueInputStream;
import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.io.ObjectStreamConstants;
/**
* A reusable ObjectInputStream that uses an internal byte array. The byte array can be set using {@link #init(byte[])},
@@ -53,7 +51,6 @@
public void init(byte[] buf) throws IOException
{
bytes.init(buf);
-// readStreamHeader();
}
/**
@@ -61,26 +58,18 @@
*/
static class ReusableByteArrayInputStream extends ByteArrayInputStream
{
- static byte[] initBytes;
+ private final static byte[] INIT_BYTES = {(byte) ((STREAM_MAGIC >>> 8) & 0xFF),
+ (byte) ((STREAM_MAGIC) & 0xFF),
+ (byte) ((STREAM_VERSION >>> 8)),
+ (byte) ((STREAM_VERSION) & 0xFF),
+ TC_BLOCKDATA, 0};
- static
- {
- ByteArrayOutputStream bytes = new ByteArrayOutputStream();
- bytes.write((byte) ((ObjectStreamConstants.STREAM_MAGIC >>> 8) & 0xFF));
- bytes.write((byte) ((ObjectStreamConstants.STREAM_MAGIC) & 0xFF));
- bytes.write((byte) ((ObjectStreamConstants.STREAM_VERSION >>> 8)));
- bytes.write((byte) ((ObjectStreamConstants.STREAM_VERSION) & 0xFF));
- bytes.write(TC_BLOCKDATA);
- bytes.write(0);
- initBytes = bytes.toByteArray();
- }
-
/**
* Creates a new instance with a null byte buffer. Use {@link #init(byte[])} to set the byte buffer to use.
*/
ReusableByteArrayInputStream()
{
- super(initBytes);
+ super(INIT_BYTES);
}
/**
@@ -99,7 +88,8 @@
public void init(byte[] b)
{
buf = b;
- count = 0;
+ count = buf.length;
+ pos = 0;
}
}
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectOutputStream.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectOutputStream.java 2008-02-05 15:45:44 UTC (rev 5293)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectOutputStream.java 2008-02-05 18:06:41 UTC (rev 5294)
@@ -18,6 +18,7 @@
public class ReusableObjectOutputStream extends ObjectOutputStream
{
ResettableByteArrayOutputStream baos;
+ boolean initialised = false;
public ReusableObjectOutputStream() throws IOException, SecurityException
{
@@ -28,10 +29,20 @@
{
super(baos);
this.baos = baos;
- reset();
}
/**
+ * Initialises a pooled stream for use.
+ */
+ public void init()
+ {
+ baos.init();
+ initialised = true;
+ // write a TC_RESET to any input streams know to clear internal data structures before attempting to read
+ baos.write(TC_RESET);
+ }
+
+ /**
* Unlike {@link java.io.ObjectOutputStream#reset()}, this method will flush and wipe the underlying byte stream,
* and ensure ObjectOutputStream headers are rewritten, awaiting proper reuse.
*
@@ -40,19 +51,15 @@
@Override
public void reset() throws IOException
{
- super.reset();
- super.flush();
- baos.reset();
- writeStreamHeader();
- super.flush();
-
- // make sure there are 6 bytes in the byte stream - 2 block data markers (which need to be removed),
- // 2 magic number bytes, and 2 version bytes.
- baos.assertCount(6);
-
- // remove the 2 leading block data marker bytes, so that we now have 4 bytes which is a header that an
- // ObjectInputStream knows how to deal with.
- baos.removeBytesFromHead(2);
+ if (initialised)
+ {
+ super.reset();
+ super.flush();
+ baos.reset();
+ super.flush();
+ baos.trim();
+ initialised = false;
+ }
}
/**
@@ -69,21 +76,21 @@
*/
static class ResettableByteArrayOutputStream extends ByteArrayOutputStream
{
- public void assertCount(int expectedCount) throws IOException
+ int getBufSize()
{
- if (count != expectedCount)
- throw new IOException("Expected a count of " + expectedCount + " but was " + count);
+ return buf.length;
}
- public void removeBytesFromHead(int numBytes)
+ public void trim()
{
- if (count < numBytes)
- throw new ArrayIndexOutOfBoundsException("Can't remove " + numBytes + " from a byte array of just " + count + " bytes!");
- int newCount = count - numBytes;
- byte[] newByteArray = new byte[16];
- System.arraycopy(buf, numBytes, newByteArray, 0, newCount);
- buf = newByteArray;
- count = newCount;
+ // make sure we trim the buffer array to it's original size, to save on memory
+ buf = null;
}
+
+ public void init()
+ {
+ buf = new byte[32];
+ count = 0;
+ }
}
}
Modified: core/trunk/src/test/java/org/jboss/cache/marshall/io/ObjectStreamPoolTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/marshall/io/ObjectStreamPoolTest.java 2008-02-05 15:45:44 UTC (rev 5293)
+++ core/trunk/src/test/java/org/jboss/cache/marshall/io/ObjectStreamPoolTest.java 2008-02-05 18:06:41 UTC (rev 5294)
@@ -6,6 +6,7 @@
import java.io.IOException;
import java.io.ObjectInputStream;
+import java.io.Serializable;
import java.util.Date;
import java.util.Random;
@@ -14,24 +15,24 @@
* @since 2.1.0
*/
@Test(groups = "functional")
-public class ObjectStreamPoolTest
+public class ObjectStreamPoolTest implements Serializable
{
- ObjectStreamPool pool;
- ReusableObjectOutputStream oos;
- ObjectInputStream ois;
+ transient ObjectStreamPool pool;
+ transient ReusableObjectOutputStream oos;
+ transient ObjectInputStream ois;
@BeforeTest
public void setUp() throws IOException, InterruptedException
{
- pool = new ObjectStreamPool(2, 2);
+ pool = new ObjectStreamPool(1, 1);
}
@AfterMethod
public void afterMethod() throws IOException, InterruptedException
{
// ensure we return stuff to the pool
- pool.returnStreamToPool(oos);
- pool.returnStreamToPool(ois);
+ if (oos != null) pool.returnStreamToPool(oos);
+ if (ois != null) pool.returnStreamToPool(ois);
}
public void testStreamReset() throws IOException, ClassNotFoundException, InterruptedException
@@ -68,6 +69,18 @@
ois = pool.getInputStream(asBytes);
assert ois.readObject().equals(toWrite);
+
+ // return to pool
+ pool.returnStreamToPool(oos);
+ pool.returnStreamToPool(ois);
+
+ oos = pool.getOutputStream();
+ toWrite = new MyClass(876, "Hello");
+ oos.writeObject(toWrite);
+ asBytes = oos.getBytes();
+
+ ois = pool.getInputStream(asBytes);
+ assert ois.readObject().equals(toWrite);
}
/**
@@ -105,6 +118,39 @@
ois = pool.getInputStream(asBytes);
for (int i = 0; i < 0xFFFF; i++) assert ois.readObject().equals(toWrite);
+
+ // return to pool
+ pool.returnStreamToPool(oos);
+ pool.returnStreamToPool(ois);
+
+ oos = pool.getOutputStream();
+ toWrite = new MyClass(1, "x");
+ for (int i = 0; i < 0xFFFF; i++) oos.writeObject(toWrite);
+ asBytes = oos.getBytes();
+
+ ois = pool.getInputStream(asBytes);
+ for (int i = 0; i < 0xFFFF; i++) assert ois.readObject().equals(toWrite);
+
+ // return to pool
+ pool.returnStreamToPool(oos);
+ pool.returnStreamToPool(ois);
+
+ oos = pool.getOutputStream();
+ int arraySize = 0xFFFF;
+ Object[] toWriteArray = new Object[arraySize];
+ for (int i = 0; i < arraySize; i++)
+ {
+ toWriteArray[i] = new MyClass(i, "x");
+ oos.writeObject(toWriteArray[i]);
+ }
+ asBytes = oos.getBytes();
+
+ ois = pool.getInputStream(asBytes);
+ for (int i = 0; i < arraySize; i++)
+ {
+ Object read = ois.readObject();
+ assert read.equals(toWriteArray[i]) : "Loop " + i + ": Expected " + toWriteArray[i] + " but was " + read;
+ }
}
public void testStreamResetWithFlush() throws Exception
@@ -155,4 +201,96 @@
assert ois.readObject().equals(toWrite);
assert ois.readObject().equals(toWrite2);
}
+
+ public void testCustomTypes() throws Exception
+ {
+ oos = pool.getOutputStream();
+
+ MyClass toWrite = new MyClass(123, "hello");
+ MyClass toWrite2 = new MyClass(456, "world");
+ oos.writeObject(toWrite);
+ oos.flush();
+ oos.writeObject(toWrite2);
+ byte[] asBytes = oos.getBytes();
+
+ ois = pool.getInputStream(asBytes);
+ assert ois.readObject().equals(toWrite);
+ assert ois.readObject().equals(toWrite2);
+ }
+
+ public void testMixedTypes() throws Exception
+ {
+ oos = pool.getOutputStream();
+
+ MyClass toWrite = new MyClass(123, "hello");
+ MyClass toWrite2 = new MyClass(456, "world");
+ oos.writeObject(toWrite);
+ oos.flush();
+ oos.writeBoolean(false);
+ oos.flush();
+ oos.writeObject(toWrite2);
+ oos.flush();
+ oos.writeShort(6);
+ byte[] asBytes = oos.getBytes();
+
+ ois = pool.getInputStream(asBytes);
+ assert ois.readObject().equals(toWrite);
+ assert ois.readBoolean() == false;
+ assert ois.readObject().equals(toWrite2);
+ assert ois.readShort() == 6;
+ }
+
+ public void testReusableOutputStreamArraySize() throws InterruptedException, IOException
+ {
+ oos = pool.getOutputStream();
+ int originalLength = oos.baos.getBufSize();
+ for (int i = 0; i < 0xFF; i++) oos.write(7);
+ oos.flush();
+ assert oos.baos.getBufSize() > 0xFF;
+
+ oos.reset();
+ oos.init();
+ assert oos.baos.getBufSize() == originalLength;
+ }
+
+ public static class MyClass implements Serializable
+ {
+ int number;
+ String name;
+
+ public MyClass(int number, String name)
+ {
+ this.number = number;
+ this.name = name;
+ }
+
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ MyClass myClass = (MyClass) o;
+
+ if (number != myClass.number) return false;
+ if (name != null ? !name.equals(myClass.name) : myClass.name != null) return false;
+
+ return true;
+ }
+
+ public int hashCode()
+ {
+ int result;
+ result = number;
+ result = 31 * result + (name != null ? name.hashCode() : 0);
+ return result;
+ }
+
+ public String toString()
+ {
+ return "MyClass{" +
+ "number=" + number +
+ ", name='" + name + '\'' +
+ '}';
+ }
+ }
}
16 years, 10 months
JBoss Cache SVN: r5293 - core/trunk/src/test/java/org/jboss/cache/lock.
by jbosscache-commits@lists.jboss.org
Author: mircea.markus
Date: 2008-02-05 10:45:44 -0500 (Tue, 05 Feb 2008)
New Revision: 5293
Modified:
core/trunk/src/test/java/org/jboss/cache/lock/LockReleaseTest.java
Log:
added a test to check that if a timeout happens, locks are being released
Modified: core/trunk/src/test/java/org/jboss/cache/lock/LockReleaseTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/lock/LockReleaseTest.java 2008-02-05 15:42:05 UTC (rev 5292)
+++ core/trunk/src/test/java/org/jboss/cache/lock/LockReleaseTest.java 2008-02-05 15:45:44 UTC (rev 5293)
@@ -12,14 +12,22 @@
import org.jboss.cache.CacheSPI;
import org.jboss.cache.DefaultCacheFactory;
import org.jboss.cache.Fqn;
+import org.jboss.cache.NodeNotExistsException;
+import org.jboss.cache.config.Configuration;
+import org.jboss.cache.factories.UnitTestCacheConfigurationFactory;
import org.jboss.cache.transaction.TransactionSetup;
-import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.AssertJUnit.*;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import javax.transaction.UserTransaction;
+import javax.transaction.Transaction;
+import javax.transaction.NotSupportedException;
+import javax.transaction.SystemException;
import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
/**
* Verifies that there are no read locks held when a transaction ends.
@@ -225,4 +233,63 @@
tx.commit();
assertEquals("we should have released all 3 write locks: ", 0, cache.getNumberOfLocksHeld());
}
+
+ /**
+ * Tests that when an acquisition timeout occurs locks are being released.
+ */
+ @Test(invocationCount = 100)
+ public void testNodeReleaseOnAcquisitionTimeout() throws Exception
+ {
+ cache = createCache(IsolationLevel.REPEATABLE_READ);
+ cache.put("/a/b","key","value");
+ cache.put("/c","key","value");
+ final Object rLockAcquired = new Object();
+ final Object wlTimeouted = new Object();
+ final Object txLocksReleased = new Object();
+ Thread thread = new Thread() {
+ public void run()
+ {
+ try
+ {
+ cache.getTransactionManager().begin();
+ cache.get("/a/b", "key"); //at this point we have an RL on /c and /c/d
+ synchronized (rLockAcquired)
+ {
+ rLockAcquired.notify();
+ }
+ synchronized ( wlTimeouted)
+ {
+ wlTimeouted.wait(50000); //wait a long time but die eventually
+ }
+ cache.getTransactionManager().commit();//here we are releasing locks
+ synchronized (txLocksReleased)
+ {
+ txLocksReleased.notify();
+ }
+ }
+ catch(Exception ex)
+ {
+ ex.printStackTrace();
+ }
+ }
+ };
+ thread.start();
+ synchronized (rLockAcquired) { rLockAcquired.wait(50000); }
+ try
+ {
+ cache.move("/a/b","c"); //acquired RL on /a and /a/b
+ fail("expected timeout here");
+ } catch (TimeoutException e)
+ {
+ synchronized (wlTimeouted)
+ {
+ wlTimeouted.notify();
+ }
+ }
+ synchronized (txLocksReleased)
+ {
+ txLocksReleased.wait();//wait for tx locks to be released
+ }
+ assertEquals(0, cache.getNumberOfLocksHeld());
+ }
}
16 years, 10 months
JBoss Cache SVN: r5292 - in core/trunk/src: main/java/org/jboss/cache/marshall/io and 2 other directories.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2008-02-05 10:42:05 -0500 (Tue, 05 Feb 2008)
New Revision: 5292
Added:
core/trunk/src/main/java/org/jboss/cache/marshall/io/
core/trunk/src/main/java/org/jboss/cache/marshall/io/ObjectStreamPool.java
core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectInputStream.java
core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectOutputStream.java
core/trunk/src/test/java/org/jboss/cache/marshall/io/
core/trunk/src/test/java/org/jboss/cache/marshall/io/ObjectStreamPoolTest.java
Modified:
core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java
Log:
Added object stream pooling (initial version for testing)
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java 2008-02-05 15:41:33 UTC (rev 5291)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java 2008-02-05 15:42:05 UTC (rev 5292)
@@ -8,13 +8,15 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.CacheException;
import org.jboss.cache.Fqn;
import org.jboss.cache.factories.ComponentRegistry;
import org.jboss.cache.factories.annotations.Inject;
import org.jboss.cache.factories.annotations.Start;
+import org.jboss.cache.marshall.io.ObjectStreamPool;
+import org.jboss.cache.marshall.io.ReusableObjectOutputStream;
import org.jboss.cache.util.Util;
-import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
@@ -43,6 +45,7 @@
Marshaller defaultMarshaller;
Map<Integer, Marshaller> marshallers = new HashMap<Integer, Marshaller>();
private int versionInt;
+ ObjectStreamPool pool;
@Inject
void injectComponentRegistry(ComponentRegistry componentRegistry)
@@ -96,6 +99,15 @@
log.debug("Started with version " + replVersionString + " and versionInt " + versionInt);
log.debug("Using default marshaller class " + this.defaultMarshaller.getClass());
}
+
+ try
+ {
+ pool = new ObjectStreamPool(25, 25);
+ }
+ catch (Exception e)
+ {
+ throw new CacheException(e);
+ }
}
protected int getCustomMarshallerVersionInt()
@@ -148,20 +160,31 @@
public byte[] objectToByteBuffer(Object obj) throws Exception
{
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- ObjectOutputStream out;
+// ByteArrayOutputStream bos = new ByteArrayOutputStream();
+// ObjectOutputStream out;
// based on the default marshaller, construct an object output stream based on what's compatible.
- out = ObjectSerializationFactory.createObjectOutputStream(bos);
- out.writeShort(versionInt);
- if (trace) log.trace("Wrote version " + versionInt);
+// out = ObjectSerializationFactory.createObjectOutputStream(bos);
+ ReusableObjectOutputStream out = pool.getOutputStream();
+ try
+ {
+ out.writeShort(versionInt);
+ if (trace) log.trace("Wrote version " + versionInt);
- //now marshall the contents of the object
- defaultMarshaller.objectToObjectStream(obj, out);
- out.close();
+ //now marshall the contents of the object
+ defaultMarshaller.objectToObjectStream(obj, out);
+// out.flush();
- // and return bytes.
- return bos.toByteArray();
+ // and return bytes.
+ return out.getBytes();
+ }
+ finally
+ {
+ // return to queue
+// out.reset();
+// queue.put(out);
+ pool.returnStreamToPool(out);
+ }
}
public Object objectFromByteBuffer(byte[] buf) throws Exception
Added: core/trunk/src/main/java/org/jboss/cache/marshall/io/ObjectStreamPool.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/io/ObjectStreamPool.java (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/io/ObjectStreamPool.java 2008-02-05 15:42:05 UTC (rev 5292)
@@ -0,0 +1,69 @@
+package org.jboss.cache.marshall.io;
+
+import org.jboss.util.stream.MarshalledValueInputStream;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * A stream pool
+ *
+ * @author Manik Surtani (<a href="mailto:manik@jboss.org">manik(a)jboss.org</a>)
+ * @since 2.1.0
+ */
+public class ObjectStreamPool
+{
+ BlockingQueue<ReusableObjectOutputStream> outputStreams;
+
+ // TODO: Implement for input streams as well - much harder.
+// BlockingQueue<ReusableObjectInputStream> inputStreams;
+
+ /**
+ * Constructs the pool with a fixed number of object output and input streams
+ *
+ * @param numOutputStreams number of object output streams
+ * @param numInputStreams number of object input streams
+ * @throws InterruptedException
+ * @throws IOException
+ */
+ public ObjectStreamPool(int numOutputStreams, int numInputStreams) throws InterruptedException, IOException
+ {
+ outputStreams = new LinkedBlockingQueue<ReusableObjectOutputStream>();
+// inputStreams = new LinkedBlockingQueue<ReusableObjectInputStream>();
+
+ for (int i = 0; i < numOutputStreams; i++) outputStreams.put(new ReusableObjectOutputStream());
+// for (int i=0; i<numInputStreams; i++) inputStreams.put(new ReusableObjectInputStream());
+ }
+
+ /**
+ * @return the next available ObjectOutputStream
+ * @throws InterruptedException
+ */
+ public ReusableObjectOutputStream getOutputStream() throws InterruptedException
+ {
+ return outputStreams.take();
+ }
+
+ public ObjectInputStream getInputStream(byte[] buf) throws IOException, InterruptedException
+ {
+ return new MarshalledValueInputStream(new ByteArrayInputStream(buf));
+// ReusableObjectInputStream rois = inputStreams.take();
+// rois.init(buf);
+// return rois;
+ }
+
+ public void returnStreamToPool(ReusableObjectOutputStream stream) throws IOException, InterruptedException
+ {
+ stream.reset();
+ outputStreams.put(stream);
+ }
+
+ public void returnStreamToPool(ObjectInputStream stream) throws InterruptedException, IOException
+ {
+// stream.reset();
+// inputStreams.put((ReusableObjectInputStream) stream);
+ }
+}
Added: core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectInputStream.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectInputStream.java (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectInputStream.java 2008-02-05 15:42:05 UTC (rev 5292)
@@ -0,0 +1,106 @@
+package org.jboss.cache.marshall.io;
+
+import org.jboss.util.stream.MarshalledValueInputStream;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectStreamConstants;
+
+/**
+ * A reusable ObjectInputStream that uses an internal byte array. The byte array can be set using {@link #init(byte[])},
+ * objects read from the byte array, and the stream reset using {@link #reset()}.
+ * <p/>
+ * These features allow the stream to be reused and pooled.
+ * <p/>
+ *
+ * @author Manik Surtani (<a href="mailto:manik@jboss.org">manik(a)jboss.org</a>)
+ * @since 2.1.0
+ */
+public class ReusableObjectInputStream extends MarshalledValueInputStream
+{
+ ReusableByteArrayInputStream bytes;
+
+ public ReusableObjectInputStream() throws IOException, SecurityException
+ {
+ this(new ReusableByteArrayInputStream());
+ }
+
+ protected ReusableObjectInputStream(ReusableByteArrayInputStream in) throws IOException
+ {
+ super(in);
+ bytes = in;
+ }
+
+ /**
+ * Resets the ObjectInputStream so it can be reused, by resetting and clearing any internal caches and by clearing the
+ * underlying byte stream to free up memory.
+ *
+ * @throws IOException
+ */
+ @Override
+ public void reset() throws IOException
+ {
+ bytes.reset();
+ bytes.clear();
+ }
+
+ /**
+ * Initialises the stream to read from the byte buffer passed in
+ *
+ * @param buf byte buffer to read from
+ */
+ public void init(byte[] buf) throws IOException
+ {
+ bytes.init(buf);
+// readStreamHeader();
+ }
+
+ /**
+ * A byte array input stream that can be reused (i.e., have it's byte array re-initialised).
+ */
+ static class ReusableByteArrayInputStream extends ByteArrayInputStream
+ {
+ static byte[] initBytes;
+
+ static
+ {
+ ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+ bytes.write((byte) ((ObjectStreamConstants.STREAM_MAGIC >>> 8) & 0xFF));
+ bytes.write((byte) ((ObjectStreamConstants.STREAM_MAGIC) & 0xFF));
+ bytes.write((byte) ((ObjectStreamConstants.STREAM_VERSION >>> 8)));
+ bytes.write((byte) ((ObjectStreamConstants.STREAM_VERSION) & 0xFF));
+ bytes.write(TC_BLOCKDATA);
+ bytes.write(0);
+ initBytes = bytes.toByteArray();
+ }
+
+ /**
+ * Creates a new instance with a null byte buffer. Use {@link #init(byte[])} to set the byte buffer to use.
+ */
+ ReusableByteArrayInputStream()
+ {
+ super(initBytes);
+ }
+
+ /**
+ * Sets the underlying byte buffer to null, to free up memory after use.
+ */
+ public void clear()
+ {
+ buf = null;
+ }
+
+ /**
+ * Initialises the byte stream with the byte array passed in.
+ *
+ * @param b byte array to use
+ */
+ public void init(byte[] b)
+ {
+ buf = b;
+ count = 0;
+ }
+ }
+
+}
Added: core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectOutputStream.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectOutputStream.java (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectOutputStream.java 2008-02-05 15:42:05 UTC (rev 5292)
@@ -0,0 +1,89 @@
+package org.jboss.cache.marshall.io;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+
+/**
+ * An ObjectOutputStream that can safely be reset and reused. Allows for easy pooling of streams for performance.
+ * <p/>
+ * This is an ObjectOutputStream with an internal underlying byte stream, which can be queried with {@link #getBytes()}.
+ * This stream is reusable - calling {@link #reset()} will ensure the underlying byte stream is reset and the ObjectOutputStream
+ * caches are flushed, and headers are written.
+ * <p/>
+ *
+ * @author Manik Surtani (<a href="mailto:manik@jboss.org">manik(a)jboss.org</a>)
+ * @since 2.1.0
+ */
+public class ReusableObjectOutputStream extends ObjectOutputStream
+{
+ ResettableByteArrayOutputStream baos;
+
+ public ReusableObjectOutputStream() throws IOException, SecurityException
+ {
+ this(new ResettableByteArrayOutputStream());
+ }
+
+ protected ReusableObjectOutputStream(ResettableByteArrayOutputStream baos) throws IOException, SecurityException
+ {
+ super(baos);
+ this.baos = baos;
+ reset();
+ }
+
+ /**
+ * Unlike {@link java.io.ObjectOutputStream#reset()}, this method will flush and wipe the underlying byte stream,
+ * and ensure ObjectOutputStream headers are rewritten, awaiting proper reuse.
+ *
+ * @throws IOException
+ */
+ @Override
+ public void reset() throws IOException
+ {
+ super.reset();
+ super.flush();
+ baos.reset();
+ writeStreamHeader();
+ super.flush();
+
+ // make sure there are 6 bytes in the byte stream - 2 block data markers (which need to be removed),
+ // 2 magic number bytes, and 2 version bytes.
+ baos.assertCount(6);
+
+ // remove the 2 leading block data marker bytes, so that we now have 4 bytes which is a header that an
+ // ObjectInputStream knows how to deal with.
+ baos.removeBytesFromHead(2);
+ }
+
+ /**
+ * @return bytes in the underlying byte stream. Will internally do a flush first.
+ */
+ public byte[] getBytes() throws IOException
+ {
+ flush();
+ return baos.toByteArray();
+ }
+
+ /**
+ * A byte array stream that allows for more direct manipulation of the byte array.
+ */
+ static class ResettableByteArrayOutputStream extends ByteArrayOutputStream
+ {
+ public void assertCount(int expectedCount) throws IOException
+ {
+ if (count != expectedCount)
+ throw new IOException("Expected a count of " + expectedCount + " but was " + count);
+ }
+
+ public void removeBytesFromHead(int numBytes)
+ {
+ if (count < numBytes)
+ throw new ArrayIndexOutOfBoundsException("Can't remove " + numBytes + " from a byte array of just " + count + " bytes!");
+ int newCount = count - numBytes;
+ byte[] newByteArray = new byte[16];
+ System.arraycopy(buf, numBytes, newByteArray, 0, newCount);
+ buf = newByteArray;
+ count = newCount;
+ }
+ }
+}
Added: core/trunk/src/test/java/org/jboss/cache/marshall/io/ObjectStreamPoolTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/marshall/io/ObjectStreamPoolTest.java (rev 0)
+++ core/trunk/src/test/java/org/jboss/cache/marshall/io/ObjectStreamPoolTest.java 2008-02-05 15:42:05 UTC (rev 5292)
@@ -0,0 +1,158 @@
+package org.jboss.cache.marshall.io;
+
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Date;
+import java.util.Random;
+
+/**
+ * @author Manik Surtani (<a href="mailto:manik@jboss.org">manik(a)jboss.org</a>)
+ * @since 2.1.0
+ */
+@Test(groups = "functional")
+public class ObjectStreamPoolTest
+{
+ ObjectStreamPool pool;
+ ReusableObjectOutputStream oos;
+ ObjectInputStream ois;
+
+ @BeforeTest
+ public void setUp() throws IOException, InterruptedException
+ {
+ pool = new ObjectStreamPool(2, 2);
+ }
+
+ @AfterMethod
+ public void afterMethod() throws IOException, InterruptedException
+ {
+ // ensure we return stuff to the pool
+ pool.returnStreamToPool(oos);
+ pool.returnStreamToPool(ois);
+ }
+
+ public void testStreamReset() throws IOException, ClassNotFoundException, InterruptedException
+ {
+ oos = pool.getOutputStream();
+
+ Object toWrite = "BlahBlah";
+ oos.writeObject(toWrite);
+ byte[] asBytes = oos.getBytes();
+
+ ois = pool.getInputStream(asBytes);
+ assert ois.readObject().equals(toWrite);
+
+ // return to pool
+ pool.returnStreamToPool(oos);
+ pool.returnStreamToPool(ois);
+
+ oos = pool.getOutputStream();
+ toWrite = new Integer(9);
+ oos.writeObject(toWrite);
+ asBytes = oos.getBytes();
+
+ ois = pool.getInputStream(asBytes);
+ assert ois.readObject().equals(toWrite);
+
+ // return to pool
+ pool.returnStreamToPool(oos);
+ pool.returnStreamToPool(ois);
+
+ oos = pool.getOutputStream();
+ toWrite = new Date();
+ oos.writeObject(toWrite);
+ asBytes = oos.getBytes();
+
+ ois = pool.getInputStream(asBytes);
+ assert ois.readObject().equals(toWrite);
+ }
+
+ /**
+ * To test block data header sizes
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ public void testLargeStreams() throws IOException, ClassNotFoundException, InterruptedException
+ {
+ oos = pool.getOutputStream();
+
+ StringBuilder sb = new StringBuilder();
+ Random r = new Random();
+ // 1K string
+ for (int i = 0; i < 0xFFFF; i++) sb.append((char) (r.nextInt(26) + 65));
+
+ Object toWrite = sb.toString();
+ System.out.println("Writing string (length = " + ((String) toWrite).length() + ") " + toWrite);
+ oos.writeObject(toWrite);
+ byte[] asBytes = oos.getBytes();
+
+ ois = pool.getInputStream(asBytes);
+ assert ois.readObject().equals(toWrite);
+
+ // return to pool
+ pool.returnStreamToPool(oos);
+ pool.returnStreamToPool(ois);
+
+ oos = pool.getOutputStream();
+ toWrite = new Integer(9);
+ for (int i = 0; i < 0xFFFF; i++) oos.writeObject(toWrite);
+ asBytes = oos.getBytes();
+
+ ois = pool.getInputStream(asBytes);
+ for (int i = 0; i < 0xFFFF; i++) assert ois.readObject().equals(toWrite);
+ }
+
+ public void testStreamResetWithFlush() throws Exception
+ {
+ oos = pool.getOutputStream();
+
+ Object toWrite = "BlahBlah", toWrite2 = "Hello world";
+ oos.writeObject(toWrite);
+ oos.flush();
+ oos.writeObject(toWrite2);
+ byte[] asBytes = oos.getBytes();
+
+ ois = pool.getInputStream(asBytes);
+ assert ois.readObject().equals(toWrite);
+ assert ois.readObject().equals(toWrite2);
+
+ // return to pool
+ pool.returnStreamToPool(oos);
+ pool.returnStreamToPool(ois);
+
+ oos = pool.getOutputStream();
+
+ toWrite = new Integer(9);
+ toWrite2 = new Long(99);
+ oos.writeObject(toWrite);
+ oos.flush();
+ oos.writeObject(toWrite2);
+ asBytes = oos.getBytes();
+
+ ois = pool.getInputStream(asBytes);
+ assert ois.readObject().equals(toWrite);
+ assert ois.readObject().equals(toWrite2);
+
+ // return to pool
+ pool.returnStreamToPool(oos);
+ pool.returnStreamToPool(ois);
+
+ oos = pool.getOutputStream();
+
+ toWrite = new Date();
+ toWrite2 = Boolean.FALSE;
+ oos.writeObject(toWrite);
+ oos.flush();
+ oos.writeObject(toWrite2);
+ asBytes = oos.getBytes();
+
+ ois = pool.getInputStream(asBytes);
+ assert ois.readObject().equals(toWrite);
+ assert ois.readObject().equals(toWrite2);
+ }
+}
16 years, 10 months
JBoss Cache SVN: r5291 - core/trunk/src/main/java/org/jboss/cache/marshall.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2008-02-05 10:41:33 -0500 (Tue, 05 Feb 2008)
New Revision: 5291
Modified:
core/trunk/src/main/java/org/jboss/cache/marshall/CacheMarshaller200.java
Log:
Unnecessary boxing
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/CacheMarshaller200.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/CacheMarshaller200.java 2008-02-05 14:33:41 UTC (rev 5290)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/CacheMarshaller200.java 2008-02-05 15:41:33 UTC (rev 5291)
@@ -540,7 +540,7 @@
private Object unmarshallObject(ObjectInputStream in, UnmarshalledReferences refMap) throws Exception
{
byte magicNumber = in.readByte();
- Integer reference = 0;
+ int reference = 0;
Object retVal;
switch (magicNumber)
{
@@ -551,11 +551,6 @@
{
reference = readReference(in);
return refMap.getReferencedObject(reference);
-// if (!refMap.containsKey(reference))
-// {
-// throw new IOException("Unable to locate object reference " + reference + " in byte stream!");
-// }
-// return refMap.get(reference);
}
else break;
case MAGICNUMBER_SERIALIZABLE:
@@ -654,7 +649,6 @@
private String unmarshallString(ObjectInputStream in) throws Exception
{
- //return StringUtil.readString(in, null);
return (String) in.readObject();
}
16 years, 10 months
JBoss Cache SVN: r5290 - in core/trunk/src/main/java/org/jboss/cache: invocation and 1 other directories.
by jbosscache-commits@lists.jboss.org
Author: mircea.markus
Date: 2008-02-05 09:33:41 -0500 (Tue, 05 Feb 2008)
New Revision: 5290
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/PessimisticLockInterceptor.java
core/trunk/src/main/java/org/jboss/cache/invocation/NodeInvocationDelegate.java
core/trunk/src/main/java/org/jboss/cache/lock/ReadWriteLockWithUpgrade.java
Log:
fixed tests
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/PessimisticLockInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/PessimisticLockInterceptor.java 2008-02-05 04:22:07 UTC (rev 5289)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/PessimisticLockInterceptor.java 2008-02-05 14:33:41 UTC (rev 5290)
@@ -14,10 +14,7 @@
import org.jboss.cache.config.Configuration;
import org.jboss.cache.factories.annotations.ComponentName;
import org.jboss.cache.factories.annotations.Inject;
-import org.jboss.cache.lock.IsolationLevel;
-import org.jboss.cache.lock.LockingException;
-import org.jboss.cache.lock.NodeLock;
-import org.jboss.cache.lock.TimeoutException;
+import org.jboss.cache.lock.*;
import org.jboss.cache.marshall.MethodDeclarations;
import org.jboss.cache.transaction.GlobalTransaction;
import org.jboss.cache.transaction.TransactionEntry;
@@ -80,9 +77,46 @@
public Object invoke(InvocationContext ctx) throws Throwable
{
if (rootNode == null) rootNode = cache.getRoot();
- return super.invoke(ctx);
+ try
+ {
+ return super.invoke(ctx);
+ } catch (LockingException le)
+ {
+ if (trace) log.trace("Locking exception occured, cleaning up locks." , le);
+ releaseLocks(ctx);
+ throw le;
+ } catch (TimeoutException te)
+ {
+ if (trace) log.trace("Locking exception occured, cleaning up locks." , te);
+ releaseLocks(ctx);
+ throw te;
+ }
}
+ /**
+ * If an issue appears while acquiring a lock (e.g. timeout exception, upgrade exception) then acquire
+ * release all acquired locks before throwing it.
+ */
+ private void releaseLocks(InvocationContext ctx)
+ {
+ GlobalTransaction gtx = ctx.getGlobalTransaction();
+ if (trace) log.trace("Releasing existing locks. Global tx?" + gtx);
+ if (gtx != null)
+ {
+ TransactionEntry te = cache.getTransactionTable().get(gtx);
+ te.releaseAllLocksFIFO(gtx);
+ } else
+ {
+ Thread currentThread = Thread.currentThread();
+ List<NodeLock> locks = getLocks(currentThread);
+ for (NodeLock aLock : locks)
+ {
+ aLock.release(currentThread);
+ }
+ }
+ }
+
+
protected Object handlePutDataMethod(InvocationContext ctx, GlobalTransaction tx, Fqn fqn, Map data, boolean createUndoOps) throws Throwable
{
return handlePutMethod(ctx, fqn);
@@ -251,9 +285,9 @@
if (trace) log.trace("There were new nodes created, skiping notification on delete");
Object[] args = ctx.getMethodCall().getArgs();
if (trace) log.trace("Changing 'skipNotification' for method '_remove' from " + args[args.length - 1] + " to true");
- args[args.length - 1] = Boolean.TRUE;
+ args[args.length - 1] = Boolean.TRUE;
}
-
+
Object retVal = nextInterceptor(ctx);
// and make sure we remove all nodes we've created for the sake of later removal.
if (ctx.getGlobalTransaction() == null)
@@ -424,13 +458,14 @@
lockTypeRequired = NodeLock.LockType.WRITE;
}
+ Fqn currentNodeFqn = currentNode.getFqn();
// actually acquire the lock we need. This method blocks.
acquireNodeLock(currentNode, owner, gtx, lockTypeRequired, timeout);
manageReverseRemove(gtx, currentNode, reverseRemoveCheck);
// make sure the lock we acquired isn't on a deleted node/is an orphan!!
// look into invalidated nodes as well
- NodeSPI repeek = peekNode(ctx, currentNode.getFqn(), true, true, true);
+ NodeSPI repeek = peekNode(ctx, currentNodeFqn, true, true, true);
if (currentNode != repeek)
{
if (trace)
@@ -445,25 +480,32 @@
if (trace) log.trace("Parent has been deleted again. Go through the lock method all over again.");
currentNode = rootNode;
parent = null;
- }
- else
+ } else
{
currentNode = parent;
+ currentIndex--;
parent = null;
if (System.currentTimeMillis() > expiryTime)
{
throw new TimeoutException("Unable to acquire lock on child node " + new Fqn(currentNode.getFqn(), childName) + " after " + timeout + " millis.");
}
+ if (trace) log.trace("Moving one level up, current node is :" + currentNode);
}
- }
- else
+ } else
{
- if (currentNode.getFqn().equals(fqn))//we've just processed the last child
+ if (currentNodeFqn.equals(fqn))//we've just processed the last child
{
break;
}
+ if (!fqn.isChildOrEquals(currentNode.getFqn()))
+ {
+ String message = new StringBuffer("currentNode instance changed the FQN(").append(currentNode.getFqn())
+ .append(") and do not match the FQN on which we want to acquire lock(").append(fqn).append(")").toString();
+ log.trace(message);
+ throw new LockingException(message);
+ }
parent = currentNode;
- currentIndex = currentNode.getFqn().size();
+ currentIndex = currentNodeFqn.size();
currentNode = currentNode.getChildDirect(fqn.get(currentIndex));
childName = fqn.get(currentIndex);
}
Modified: core/trunk/src/main/java/org/jboss/cache/invocation/NodeInvocationDelegate.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/invocation/NodeInvocationDelegate.java 2008-02-05 04:22:07 UTC (rev 5289)
+++ core/trunk/src/main/java/org/jboss/cache/invocation/NodeInvocationDelegate.java 2008-02-05 14:33:41 UTC (rev 5290)
@@ -491,4 +491,9 @@
if (!node.isValid())
throw new NodeNotValidException("Node " + getFqn() + " is not valid. Perhaps it has been moved or removed.");
}
+
+ public String toString()
+ {
+ return node == null ? null : node.toString();
+ }
}
Modified: core/trunk/src/main/java/org/jboss/cache/lock/ReadWriteLockWithUpgrade.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/lock/ReadWriteLockWithUpgrade.java 2008-02-05 04:22:07 UTC (rev 5289)
+++ core/trunk/src/main/java/org/jboss/cache/lock/ReadWriteLockWithUpgrade.java 2008-02-05 14:33:41 UTC (rev 5290)
@@ -16,7 +16,7 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
-/*
+/**
* <p> This class is similar to PreferredWriterReadWriteLock except that
* the read lock is upgradable to write lock. I.e., when a user calls
* upgradeLock(), it will release the read lock, wait for
16 years, 10 months
JBoss Cache SVN: r5289 - pojo/trunk/src/main/java/org/jboss/cache/pojo/interceptors.
by jbosscache-commits@lists.jboss.org
Author: jason.greene(a)jboss.com
Date: 2008-02-04 23:22:07 -0500 (Mon, 04 Feb 2008)
New Revision: 5289
Modified:
pojo/trunk/src/main/java/org/jboss/cache/pojo/interceptors/PojoTxLockInterceptor.java
Log:
Do not lock the parent node
Modified: pojo/trunk/src/main/java/org/jboss/cache/pojo/interceptors/PojoTxLockInterceptor.java
===================================================================
--- pojo/trunk/src/main/java/org/jboss/cache/pojo/interceptors/PojoTxLockInterceptor.java 2008-02-05 03:58:08 UTC (rev 5288)
+++ pojo/trunk/src/main/java/org/jboss/cache/pojo/interceptors/PojoTxLockInterceptor.java 2008-02-05 04:22:07 UTC (rev 5289)
@@ -64,32 +64,18 @@
boolean isNeeded = true;
int retry = 0;
- // If this is an internal id and also it has three levels, we are saying this is
- // Collection, and we need to lock the parent as well.
- // TODO Still a bit ad hoc.
- Fqn realId = id;
- if (id.isChildOrEquals(InternalConstant.JBOSS_INTERNAL) && id.size() > 2)
- {
- realId = id.getParent();
- if (log.isDebugEnabled())
- {
- log.debug("lockPojo(): will lock parent id instead:" + realId);
- }
- }
-
+
while (isNeeded)
{
try
{
- cache.put(realId, LOCK_KEY, "LOCK");
+ cache.put(id, LOCK_KEY, "LOCK");
isNeeded = false;
}
catch (UpgradeException upe)
{
- log.warn("lockPojo(): can't upgrade the lock during lockPojo. Will re-try. id: " + realId
+ log.warn("lockPojo(): can't upgrade the lock during lockPojo. Will re-try. id: " + id
+ " retry times: " + retry);
-// TODO is this really ok to comment out??
-// cache.get(realId).release(owner);
if (retry++ > RETRY)
{
return false;
@@ -101,12 +87,10 @@
}
catch (InterruptedException e)
{
- ;
}
}
}
return true;
}
-
}
16 years, 10 months