Author: manik.surtani(a)jboss.com
Date: 2008-02-05 13:06:41 -0500 (Tue, 05 Feb 2008)
New Revision: 5294
Modified:
core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java
core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java
core/trunk/src/main/java/org/jboss/cache/marshall/io/ObjectStreamPool.java
core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectInputStream.java
core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectOutputStream.java
core/trunk/src/test/java/org/jboss/cache/marshall/io/ObjectStreamPoolTest.java
Log:
Added object stream pooling for input streams as well
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java 2008-02-05
15:45:44 UTC (rev 5293)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java 2008-02-05
18:06:41 UTC (rev 5294)
@@ -18,10 +18,7 @@
import org.jboss.cache.factories.annotations.Start;
import org.jboss.cache.transaction.GlobalTransaction;
-import java.io.ByteArrayOutputStream;
import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -75,30 +72,33 @@
// implement the basic contract set in RPcDispatcher.AbstractMarshaller
public byte[] objectToByteBuffer(Object obj) throws Exception
{
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ObjectOutputStream out =
ObjectSerializationFactory.createObjectOutputStream(baos);
- objectToObjectStream(obj, out);
- out.close();
- return baos.toByteArray();
+ throw new RuntimeException("Needs to be overridden!");
+// ByteArrayOutputStream baos = new ByteArrayOutputStream();
+// ObjectOutputStream out =
ObjectSerializationFactory.createObjectOutputStream(baos);
+// objectToObjectStream(obj, out);
+// out.close();
+// return baos.toByteArray();
}
public Object objectFromByteBuffer(byte[] bytes) throws Exception
{
- ObjectInputStream in = ObjectSerializationFactory.createObjectInputStream(bytes);
- return objectFromObjectStream(in);
+ throw new RuntimeException("Needs to be overridden!");
+ //ObjectInputStream in =
ObjectSerializationFactory.createObjectInputStream(bytes);
+ //return objectFromObjectStream(in);
}
public Object objectFromStream(InputStream in) throws Exception
{
+ throw new RuntimeException("Needs to be overridden!");
// by default just create an OIS from this IS and pass in to the relevant method
- if (in instanceof ObjectInputStream)
- {
- return objectFromObjectStream((ObjectInputStream) in);
- }
- else
- {
- return
objectFromObjectStream(ObjectSerializationFactory.createObjectInputStream(in));
- }
+// if (in instanceof ObjectInputStream)
+// {
+// return objectFromObjectStream((ObjectInputStream) in);
+// }
+// else
+// {
+// return
objectFromObjectStream(ObjectSerializationFactory.createObjectInputStream(in));
+// }
}
/**
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java 2008-02-05
15:45:44 UTC (rev 5293)
+++
core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java 2008-02-05
18:06:41 UTC (rev 5294)
@@ -160,11 +160,6 @@
public byte[] objectToByteBuffer(Object obj) throws Exception
{
-// ByteArrayOutputStream bos = new ByteArrayOutputStream();
-// ObjectOutputStream out;
-
- // based on the default marshaller, construct an object output stream based on
what's compatible.
-// out = ObjectSerializationFactory.createObjectOutputStream(bos);
ReusableObjectOutputStream out = pool.getOutputStream();
try
{
@@ -173,16 +168,12 @@
//now marshall the contents of the object
defaultMarshaller.objectToObjectStream(obj, out);
-// out.flush();
// and return bytes.
return out.getBytes();
}
finally
{
- // return to queue
-// out.reset();
-// queue.put(out);
pool.returnStreamToPool(out);
}
}
@@ -191,44 +182,38 @@
{
Marshaller marshaller;
int versionId;
- ObjectInputStream in;
+ ObjectInputStream in = pool.getInputStream(buf);
+ ;
+
try
{
- in = ObjectSerializationFactory.createObjectInputStream(buf);
- versionId = in.readShort();
- if (trace) log.trace("Read version " + versionId);
+ try
+ {
+ versionId = in.readShort();
+ if (trace) log.trace("Read version " + versionId);
+ }
+ catch (Exception e)
+ {
+ log.error("Unable to read version id from first two bytes of stream,
barfing.");
+ throw e;
+ }
+
+ marshaller = getMarshaller(versionId);
+
+ return marshaller.objectFromObjectStream(in);
}
- catch (Exception e)
+ finally
{
- log.error("Unable to read version id from first two bytes of stream,
barfing.");
- throw e;
+ pool.returnStreamToPool(in);
}
-
- marshaller = getMarshaller(versionId);
-
- return marshaller.objectFromObjectStream(in);
}
public Object objectFromStream(InputStream is) throws Exception
{
- Marshaller marshaller;
- int versionId;
- ObjectInputStream in;
- try
- {
- in = ObjectSerializationFactory.createObjectInputStream(is);
- versionId = in.readShort();
- if (trace) log.trace("Read version " + versionId);
- }
- catch (Exception e)
- {
- log.error("Unable to read version id from first two bytes of stream,
barfing.");
- throw e;
- }
-
- marshaller = getMarshaller(versionId);
-
- return marshaller.objectFromObjectStream(in);
+ int avbl = is.available();
+ byte[] bytes = new byte[avbl];
+ is.read(bytes, 0, avbl);
+ return objectFromByteBuffer(bytes);
}
public void objectToObjectStream(Object obj, ObjectOutputStream out, Fqn region)
throws Exception
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/io/ObjectStreamPool.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/io/ObjectStreamPool.java 2008-02-05
15:45:44 UTC (rev 5293)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/io/ObjectStreamPool.java 2008-02-05
18:06:41 UTC (rev 5294)
@@ -1,8 +1,5 @@
package org.jboss.cache.marshall.io;
-import org.jboss.util.stream.MarshalledValueInputStream;
-
-import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.concurrent.BlockingQueue;
@@ -17,10 +14,8 @@
public class ObjectStreamPool
{
BlockingQueue<ReusableObjectOutputStream> outputStreams;
+ BlockingQueue<ReusableObjectInputStream> inputStreams;
- // TODO: Implement for input streams as well - much harder.
-// BlockingQueue<ReusableObjectInputStream> inputStreams;
-
/**
* Constructs the pool with a fixed number of object output and input streams
*
@@ -32,10 +27,10 @@
public ObjectStreamPool(int numOutputStreams, int numInputStreams) throws
InterruptedException, IOException
{
outputStreams = new LinkedBlockingQueue<ReusableObjectOutputStream>();
-// inputStreams = new LinkedBlockingQueue<ReusableObjectInputStream>();
+ inputStreams = new LinkedBlockingQueue<ReusableObjectInputStream>();
for (int i = 0; i < numOutputStreams; i++) outputStreams.put(new
ReusableObjectOutputStream());
-// for (int i=0; i<numInputStreams; i++) inputStreams.put(new
ReusableObjectInputStream());
+ for (int i = 0; i < numInputStreams; i++) inputStreams.put(new
ReusableObjectInputStream());
}
/**
@@ -44,15 +39,17 @@
*/
public ReusableObjectOutputStream getOutputStream() throws InterruptedException
{
- return outputStreams.take();
+ ReusableObjectOutputStream roos = outputStreams.take();
+ roos.init();
+ return roos;
}
public ObjectInputStream getInputStream(byte[] buf) throws IOException,
InterruptedException
{
- return new MarshalledValueInputStream(new ByteArrayInputStream(buf));
-// ReusableObjectInputStream rois = inputStreams.take();
-// rois.init(buf);
-// return rois;
+// return new MarshalledValueInputStream(new ByteArrayInputStream(buf));
+ ReusableObjectInputStream rois = inputStreams.take();
+ rois.init(buf);
+ return rois;
}
public void returnStreamToPool(ReusableObjectOutputStream stream) throws IOException,
InterruptedException
@@ -63,7 +60,7 @@
public void returnStreamToPool(ObjectInputStream stream) throws InterruptedException,
IOException
{
-// stream.reset();
-// inputStreams.put((ReusableObjectInputStream) stream);
+ stream.reset();
+ inputStreams.put((ReusableObjectInputStream) stream);
}
}
Modified:
core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectInputStream.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectInputStream.java 2008-02-05
15:45:44 UTC (rev 5293)
+++
core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectInputStream.java 2008-02-05
18:06:41 UTC (rev 5294)
@@ -3,9 +3,7 @@
import org.jboss.util.stream.MarshalledValueInputStream;
import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.io.ObjectStreamConstants;
/**
* A reusable ObjectInputStream that uses an internal byte array. The byte array can be
set using {@link #init(byte[])},
@@ -53,7 +51,6 @@
public void init(byte[] buf) throws IOException
{
bytes.init(buf);
-// readStreamHeader();
}
/**
@@ -61,26 +58,18 @@
*/
static class ReusableByteArrayInputStream extends ByteArrayInputStream
{
- static byte[] initBytes;
+ private final static byte[] INIT_BYTES = {(byte) ((STREAM_MAGIC >>> 8)
& 0xFF),
+ (byte) ((STREAM_MAGIC) & 0xFF),
+ (byte) ((STREAM_VERSION >>> 8)),
+ (byte) ((STREAM_VERSION) & 0xFF),
+ TC_BLOCKDATA, 0};
- static
- {
- ByteArrayOutputStream bytes = new ByteArrayOutputStream();
- bytes.write((byte) ((ObjectStreamConstants.STREAM_MAGIC >>> 8) &
0xFF));
- bytes.write((byte) ((ObjectStreamConstants.STREAM_MAGIC) & 0xFF));
- bytes.write((byte) ((ObjectStreamConstants.STREAM_VERSION >>> 8)));
- bytes.write((byte) ((ObjectStreamConstants.STREAM_VERSION) & 0xFF));
- bytes.write(TC_BLOCKDATA);
- bytes.write(0);
- initBytes = bytes.toByteArray();
- }
-
/**
* Creates a new instance with a null byte buffer. Use {@link #init(byte[])} to
set the byte buffer to use.
*/
ReusableByteArrayInputStream()
{
- super(initBytes);
+ super(INIT_BYTES);
}
/**
@@ -99,7 +88,8 @@
public void init(byte[] b)
{
buf = b;
- count = 0;
+ count = buf.length;
+ pos = 0;
}
}
Modified:
core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectOutputStream.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectOutputStream.java 2008-02-05
15:45:44 UTC (rev 5293)
+++
core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectOutputStream.java 2008-02-05
18:06:41 UTC (rev 5294)
@@ -18,6 +18,7 @@
public class ReusableObjectOutputStream extends ObjectOutputStream
{
ResettableByteArrayOutputStream baos;
+ boolean initialised = false;
public ReusableObjectOutputStream() throws IOException, SecurityException
{
@@ -28,10 +29,20 @@
{
super(baos);
this.baos = baos;
- reset();
}
/**
+ * Initialises a pooled stream for use.
+ */
+ public void init()
+ {
+ baos.init();
+ initialised = true;
+ // write a TC_RESET to any input streams know to clear internal data structures
before attempting to read
+ baos.write(TC_RESET);
+ }
+
+ /**
* Unlike {@link java.io.ObjectOutputStream#reset()}, this method will flush and wipe
the underlying byte stream,
* and ensure ObjectOutputStream headers are rewritten, awaiting proper reuse.
*
@@ -40,19 +51,15 @@
@Override
public void reset() throws IOException
{
- super.reset();
- super.flush();
- baos.reset();
- writeStreamHeader();
- super.flush();
-
- // make sure there are 6 bytes in the byte stream - 2 block data markers (which
need to be removed),
- // 2 magic number bytes, and 2 version bytes.
- baos.assertCount(6);
-
- // remove the 2 leading block data marker bytes, so that we now have 4 bytes which
is a header that an
- // ObjectInputStream knows how to deal with.
- baos.removeBytesFromHead(2);
+ if (initialised)
+ {
+ super.reset();
+ super.flush();
+ baos.reset();
+ super.flush();
+ baos.trim();
+ initialised = false;
+ }
}
/**
@@ -69,21 +76,21 @@
*/
static class ResettableByteArrayOutputStream extends ByteArrayOutputStream
{
- public void assertCount(int expectedCount) throws IOException
+ int getBufSize()
{
- if (count != expectedCount)
- throw new IOException("Expected a count of " + expectedCount +
" but was " + count);
+ return buf.length;
}
- public void removeBytesFromHead(int numBytes)
+ public void trim()
{
- if (count < numBytes)
- throw new ArrayIndexOutOfBoundsException("Can't remove " +
numBytes + " from a byte array of just " + count + " bytes!");
- int newCount = count - numBytes;
- byte[] newByteArray = new byte[16];
- System.arraycopy(buf, numBytes, newByteArray, 0, newCount);
- buf = newByteArray;
- count = newCount;
+ // make sure we trim the buffer array to it's original size, to save on
memory
+ buf = null;
}
+
+ public void init()
+ {
+ buf = new byte[32];
+ count = 0;
+ }
}
}
Modified: core/trunk/src/test/java/org/jboss/cache/marshall/io/ObjectStreamPoolTest.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/marshall/io/ObjectStreamPoolTest.java 2008-02-05
15:45:44 UTC (rev 5293)
+++
core/trunk/src/test/java/org/jboss/cache/marshall/io/ObjectStreamPoolTest.java 2008-02-05
18:06:41 UTC (rev 5294)
@@ -6,6 +6,7 @@
import java.io.IOException;
import java.io.ObjectInputStream;
+import java.io.Serializable;
import java.util.Date;
import java.util.Random;
@@ -14,24 +15,24 @@
* @since 2.1.0
*/
@Test(groups = "functional")
-public class ObjectStreamPoolTest
+public class ObjectStreamPoolTest implements Serializable
{
- ObjectStreamPool pool;
- ReusableObjectOutputStream oos;
- ObjectInputStream ois;
+ transient ObjectStreamPool pool;
+ transient ReusableObjectOutputStream oos;
+ transient ObjectInputStream ois;
@BeforeTest
public void setUp() throws IOException, InterruptedException
{
- pool = new ObjectStreamPool(2, 2);
+ pool = new ObjectStreamPool(1, 1);
}
@AfterMethod
public void afterMethod() throws IOException, InterruptedException
{
// ensure we return stuff to the pool
- pool.returnStreamToPool(oos);
- pool.returnStreamToPool(ois);
+ if (oos != null) pool.returnStreamToPool(oos);
+ if (ois != null) pool.returnStreamToPool(ois);
}
public void testStreamReset() throws IOException, ClassNotFoundException,
InterruptedException
@@ -68,6 +69,18 @@
ois = pool.getInputStream(asBytes);
assert ois.readObject().equals(toWrite);
+
+ // return to pool
+ pool.returnStreamToPool(oos);
+ pool.returnStreamToPool(ois);
+
+ oos = pool.getOutputStream();
+ toWrite = new MyClass(876, "Hello");
+ oos.writeObject(toWrite);
+ asBytes = oos.getBytes();
+
+ ois = pool.getInputStream(asBytes);
+ assert ois.readObject().equals(toWrite);
}
/**
@@ -105,6 +118,39 @@
ois = pool.getInputStream(asBytes);
for (int i = 0; i < 0xFFFF; i++) assert ois.readObject().equals(toWrite);
+
+ // return to pool
+ pool.returnStreamToPool(oos);
+ pool.returnStreamToPool(ois);
+
+ oos = pool.getOutputStream();
+ toWrite = new MyClass(1, "x");
+ for (int i = 0; i < 0xFFFF; i++) oos.writeObject(toWrite);
+ asBytes = oos.getBytes();
+
+ ois = pool.getInputStream(asBytes);
+ for (int i = 0; i < 0xFFFF; i++) assert ois.readObject().equals(toWrite);
+
+ // return to pool
+ pool.returnStreamToPool(oos);
+ pool.returnStreamToPool(ois);
+
+ oos = pool.getOutputStream();
+ int arraySize = 0xFFFF;
+ Object[] toWriteArray = new Object[arraySize];
+ for (int i = 0; i < arraySize; i++)
+ {
+ toWriteArray[i] = new MyClass(i, "x");
+ oos.writeObject(toWriteArray[i]);
+ }
+ asBytes = oos.getBytes();
+
+ ois = pool.getInputStream(asBytes);
+ for (int i = 0; i < arraySize; i++)
+ {
+ Object read = ois.readObject();
+ assert read.equals(toWriteArray[i]) : "Loop " + i + ": Expected
" + toWriteArray[i] + " but was " + read;
+ }
}
public void testStreamResetWithFlush() throws Exception
@@ -155,4 +201,96 @@
assert ois.readObject().equals(toWrite);
assert ois.readObject().equals(toWrite2);
}
+
+ public void testCustomTypes() throws Exception
+ {
+ oos = pool.getOutputStream();
+
+ MyClass toWrite = new MyClass(123, "hello");
+ MyClass toWrite2 = new MyClass(456, "world");
+ oos.writeObject(toWrite);
+ oos.flush();
+ oos.writeObject(toWrite2);
+ byte[] asBytes = oos.getBytes();
+
+ ois = pool.getInputStream(asBytes);
+ assert ois.readObject().equals(toWrite);
+ assert ois.readObject().equals(toWrite2);
+ }
+
+ public void testMixedTypes() throws Exception
+ {
+ oos = pool.getOutputStream();
+
+ MyClass toWrite = new MyClass(123, "hello");
+ MyClass toWrite2 = new MyClass(456, "world");
+ oos.writeObject(toWrite);
+ oos.flush();
+ oos.writeBoolean(false);
+ oos.flush();
+ oos.writeObject(toWrite2);
+ oos.flush();
+ oos.writeShort(6);
+ byte[] asBytes = oos.getBytes();
+
+ ois = pool.getInputStream(asBytes);
+ assert ois.readObject().equals(toWrite);
+ assert ois.readBoolean() == false;
+ assert ois.readObject().equals(toWrite2);
+ assert ois.readShort() == 6;
+ }
+
+ public void testReusableOutputStreamArraySize() throws InterruptedException,
IOException
+ {
+ oos = pool.getOutputStream();
+ int originalLength = oos.baos.getBufSize();
+ for (int i = 0; i < 0xFF; i++) oos.write(7);
+ oos.flush();
+ assert oos.baos.getBufSize() > 0xFF;
+
+ oos.reset();
+ oos.init();
+ assert oos.baos.getBufSize() == originalLength;
+ }
+
+ public static class MyClass implements Serializable
+ {
+ int number;
+ String name;
+
+ public MyClass(int number, String name)
+ {
+ this.number = number;
+ this.name = name;
+ }
+
+ public boolean equals(Object o)
+ {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ MyClass myClass = (MyClass) o;
+
+ if (number != myClass.number) return false;
+ if (name != null ? !name.equals(myClass.name) : myClass.name != null) return
false;
+
+ return true;
+ }
+
+ public int hashCode()
+ {
+ int result;
+ result = number;
+ result = 31 * result + (name != null ? name.hashCode() : 0);
+ return result;
+ }
+
+ public String toString()
+ {
+ return "MyClass{" +
+ "number=" + number +
+ ", name='" + name + '\'' +
+ '}';
+ }
+ }
}