Author: manik.surtani(a)jboss.com
Date: 2008-02-05 10:42:05 -0500 (Tue, 05 Feb 2008)
New Revision: 5292
Added:
core/trunk/src/main/java/org/jboss/cache/marshall/io/
core/trunk/src/main/java/org/jboss/cache/marshall/io/ObjectStreamPool.java
core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectInputStream.java
core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectOutputStream.java
core/trunk/src/test/java/org/jboss/cache/marshall/io/
core/trunk/src/test/java/org/jboss/cache/marshall/io/ObjectStreamPoolTest.java
Modified:
core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java
Log:
Added object stream pooling (initial version for testing)
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java 2008-02-05
15:41:33 UTC (rev 5291)
+++
core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java 2008-02-05
15:42:05 UTC (rev 5292)
@@ -8,13 +8,15 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.CacheException;
import org.jboss.cache.Fqn;
import org.jboss.cache.factories.ComponentRegistry;
import org.jboss.cache.factories.annotations.Inject;
import org.jboss.cache.factories.annotations.Start;
+import org.jboss.cache.marshall.io.ObjectStreamPool;
+import org.jboss.cache.marshall.io.ReusableObjectOutputStream;
import org.jboss.cache.util.Util;
-import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
@@ -43,6 +45,7 @@
Marshaller defaultMarshaller;
Map<Integer, Marshaller> marshallers = new HashMap<Integer,
Marshaller>();
private int versionInt;
+ ObjectStreamPool pool;
@Inject
void injectComponentRegistry(ComponentRegistry componentRegistry)
@@ -96,6 +99,15 @@
log.debug("Started with version " + replVersionString + " and
versionInt " + versionInt);
log.debug("Using default marshaller class " +
this.defaultMarshaller.getClass());
}
+
+ try
+ {
+ pool = new ObjectStreamPool(25, 25);
+ }
+ catch (Exception e)
+ {
+ throw new CacheException(e);
+ }
}
protected int getCustomMarshallerVersionInt()
@@ -148,20 +160,31 @@
public byte[] objectToByteBuffer(Object obj) throws Exception
{
- ByteArrayOutputStream bos = new ByteArrayOutputStream();
- ObjectOutputStream out;
+// ByteArrayOutputStream bos = new ByteArrayOutputStream();
+// ObjectOutputStream out;
// based on the default marshaller, construct an object output stream based on
what's compatible.
- out = ObjectSerializationFactory.createObjectOutputStream(bos);
- out.writeShort(versionInt);
- if (trace) log.trace("Wrote version " + versionInt);
+// out = ObjectSerializationFactory.createObjectOutputStream(bos);
+ ReusableObjectOutputStream out = pool.getOutputStream();
+ try
+ {
+ out.writeShort(versionInt);
+ if (trace) log.trace("Wrote version " + versionInt);
- //now marshall the contents of the object
- defaultMarshaller.objectToObjectStream(obj, out);
- out.close();
+ //now marshall the contents of the object
+ defaultMarshaller.objectToObjectStream(obj, out);
+// out.flush();
- // and return bytes.
- return bos.toByteArray();
+ // and return bytes.
+ return out.getBytes();
+ }
+ finally
+ {
+ // return to queue
+// out.reset();
+// queue.put(out);
+ pool.returnStreamToPool(out);
+ }
}
public Object objectFromByteBuffer(byte[] buf) throws Exception
Added: core/trunk/src/main/java/org/jboss/cache/marshall/io/ObjectStreamPool.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/io/ObjectStreamPool.java
(rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/io/ObjectStreamPool.java 2008-02-05
15:42:05 UTC (rev 5292)
@@ -0,0 +1,69 @@
+package org.jboss.cache.marshall.io;
+
+import org.jboss.util.stream.MarshalledValueInputStream;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ * A stream pool
+ *
+ * @author Manik Surtani (<a
href="mailto:manik@jboss.org">manik@jboss.org</a>)
+ * @since 2.1.0
+ */
+public class ObjectStreamPool
+{
+ BlockingQueue<ReusableObjectOutputStream> outputStreams;
+
+ // TODO: Implement for input streams as well - much harder.
+// BlockingQueue<ReusableObjectInputStream> inputStreams;
+
+ /**
+ * Constructs the pool with a fixed number of object output and input streams
+ *
+ * @param numOutputStreams number of object output streams
+ * @param numInputStreams number of object input streams
+ * @throws InterruptedException
+ * @throws IOException
+ */
+ public ObjectStreamPool(int numOutputStreams, int numInputStreams) throws
InterruptedException, IOException
+ {
+ outputStreams = new LinkedBlockingQueue<ReusableObjectOutputStream>();
+// inputStreams = new LinkedBlockingQueue<ReusableObjectInputStream>();
+
+ for (int i = 0; i < numOutputStreams; i++) outputStreams.put(new
ReusableObjectOutputStream());
+// for (int i=0; i<numInputStreams; i++) inputStreams.put(new
ReusableObjectInputStream());
+ }
+
+ /**
+ * @return the next available ObjectOutputStream
+ * @throws InterruptedException
+ */
+ public ReusableObjectOutputStream getOutputStream() throws InterruptedException
+ {
+ return outputStreams.take();
+ }
+
+ public ObjectInputStream getInputStream(byte[] buf) throws IOException,
InterruptedException
+ {
+ return new MarshalledValueInputStream(new ByteArrayInputStream(buf));
+// ReusableObjectInputStream rois = inputStreams.take();
+// rois.init(buf);
+// return rois;
+ }
+
+ public void returnStreamToPool(ReusableObjectOutputStream stream) throws IOException,
InterruptedException
+ {
+ stream.reset();
+ outputStreams.put(stream);
+ }
+
+ public void returnStreamToPool(ObjectInputStream stream) throws InterruptedException,
IOException
+ {
+// stream.reset();
+// inputStreams.put((ReusableObjectInputStream) stream);
+ }
+}
Added:
core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectInputStream.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectInputStream.java
(rev 0)
+++
core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectInputStream.java 2008-02-05
15:42:05 UTC (rev 5292)
@@ -0,0 +1,106 @@
+package org.jboss.cache.marshall.io;
+
+import org.jboss.util.stream.MarshalledValueInputStream;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectStreamConstants;
+
+/**
+ * A reusable ObjectInputStream that uses an internal byte array. The byte array can be
set using {@link #init(byte[])},
+ * objects read from the byte array, and the stream reset using {@link #reset()}.
+ * <p/>
+ * These features allow the stream to be reused and pooled.
+ * <p/>
+ *
+ * @author Manik Surtani (<a
href="mailto:manik@jboss.org">manik@jboss.org</a>)
+ * @since 2.1.0
+ */
+public class ReusableObjectInputStream extends MarshalledValueInputStream
+{
+ ReusableByteArrayInputStream bytes;
+
+ public ReusableObjectInputStream() throws IOException, SecurityException
+ {
+ this(new ReusableByteArrayInputStream());
+ }
+
+ protected ReusableObjectInputStream(ReusableByteArrayInputStream in) throws
IOException
+ {
+ super(in);
+ bytes = in;
+ }
+
+ /**
+ * Resets the ObjectInputStream so it can be reused, by resetting and clearing any
internal caches and by clearing the
+ * underlying byte stream to free up memory.
+ *
+ * @throws IOException
+ */
+ @Override
+ public void reset() throws IOException
+ {
+ bytes.reset();
+ bytes.clear();
+ }
+
+ /**
+ * Initialises the stream to read from the byte buffer passed in
+ *
+ * @param buf byte buffer to read from
+ */
+ public void init(byte[] buf) throws IOException
+ {
+ bytes.init(buf);
+// readStreamHeader();
+ }
+
+ /**
+ * A byte array input stream that can be reused (i.e., have it's byte array
re-initialised).
+ */
+ static class ReusableByteArrayInputStream extends ByteArrayInputStream
+ {
+ static byte[] initBytes;
+
+ static
+ {
+ ByteArrayOutputStream bytes = new ByteArrayOutputStream();
+ bytes.write((byte) ((ObjectStreamConstants.STREAM_MAGIC >>> 8) &
0xFF));
+ bytes.write((byte) ((ObjectStreamConstants.STREAM_MAGIC) & 0xFF));
+ bytes.write((byte) ((ObjectStreamConstants.STREAM_VERSION >>> 8)));
+ bytes.write((byte) ((ObjectStreamConstants.STREAM_VERSION) & 0xFF));
+ bytes.write(TC_BLOCKDATA);
+ bytes.write(0);
+ initBytes = bytes.toByteArray();
+ }
+
+ /**
+ * Creates a new instance with a null byte buffer. Use {@link #init(byte[])} to
set the byte buffer to use.
+ */
+ ReusableByteArrayInputStream()
+ {
+ super(initBytes);
+ }
+
+ /**
+ * Sets the underlying byte buffer to null, to free up memory after use.
+ */
+ public void clear()
+ {
+ buf = null;
+ }
+
+ /**
+ * Initialises the byte stream with the byte array passed in.
+ *
+ * @param b byte array to use
+ */
+ public void init(byte[] b)
+ {
+ buf = b;
+ count = 0;
+ }
+ }
+
+}
Added:
core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectOutputStream.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectOutputStream.java
(rev 0)
+++
core/trunk/src/main/java/org/jboss/cache/marshall/io/ReusableObjectOutputStream.java 2008-02-05
15:42:05 UTC (rev 5292)
@@ -0,0 +1,89 @@
+package org.jboss.cache.marshall.io;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+
+/**
+ * An ObjectOutputStream that can safely be reset and reused. Allows for easy pooling of
streams for performance.
+ * <p/>
+ * This is an ObjectOutputStream with an internal underlying byte stream, which can be
queried with {@link #getBytes()}.
+ * This stream is reusable - calling {@link #reset()} will ensure the underlying byte
stream is reset and the ObjectOutputStream
+ * caches are flushed, and headers are written.
+ * <p/>
+ *
+ * @author Manik Surtani (<a
href="mailto:manik@jboss.org">manik@jboss.org</a>)
+ * @since 2.1.0
+ */
+public class ReusableObjectOutputStream extends ObjectOutputStream
+{
+ ResettableByteArrayOutputStream baos;
+
+ public ReusableObjectOutputStream() throws IOException, SecurityException
+ {
+ this(new ResettableByteArrayOutputStream());
+ }
+
+ protected ReusableObjectOutputStream(ResettableByteArrayOutputStream baos) throws
IOException, SecurityException
+ {
+ super(baos);
+ this.baos = baos;
+ reset();
+ }
+
+ /**
+ * Unlike {@link java.io.ObjectOutputStream#reset()}, this method will flush and wipe
the underlying byte stream,
+ * and ensure ObjectOutputStream headers are rewritten, awaiting proper reuse.
+ *
+ * @throws IOException
+ */
+ @Override
+ public void reset() throws IOException
+ {
+ super.reset();
+ super.flush();
+ baos.reset();
+ writeStreamHeader();
+ super.flush();
+
+ // make sure there are 6 bytes in the byte stream - 2 block data markers (which
need to be removed),
+ // 2 magic number bytes, and 2 version bytes.
+ baos.assertCount(6);
+
+ // remove the 2 leading block data marker bytes, so that we now have 4 bytes which
is a header that an
+ // ObjectInputStream knows how to deal with.
+ baos.removeBytesFromHead(2);
+ }
+
+ /**
+ * @return bytes in the underlying byte stream. Will internally do a flush first.
+ */
+ public byte[] getBytes() throws IOException
+ {
+ flush();
+ return baos.toByteArray();
+ }
+
+ /**
+ * A byte array stream that allows for more direct manipulation of the byte array.
+ */
+ static class ResettableByteArrayOutputStream extends ByteArrayOutputStream
+ {
+ public void assertCount(int expectedCount) throws IOException
+ {
+ if (count != expectedCount)
+ throw new IOException("Expected a count of " + expectedCount +
" but was " + count);
+ }
+
+ public void removeBytesFromHead(int numBytes)
+ {
+ if (count < numBytes)
+ throw new ArrayIndexOutOfBoundsException("Can't remove " +
numBytes + " from a byte array of just " + count + " bytes!");
+ int newCount = count - numBytes;
+ byte[] newByteArray = new byte[16];
+ System.arraycopy(buf, numBytes, newByteArray, 0, newCount);
+ buf = newByteArray;
+ count = newCount;
+ }
+ }
+}
Added: core/trunk/src/test/java/org/jboss/cache/marshall/io/ObjectStreamPoolTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/marshall/io/ObjectStreamPoolTest.java
(rev 0)
+++
core/trunk/src/test/java/org/jboss/cache/marshall/io/ObjectStreamPoolTest.java 2008-02-05
15:42:05 UTC (rev 5292)
@@ -0,0 +1,158 @@
+package org.jboss.cache.marshall.io;
+
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.Date;
+import java.util.Random;
+
+/**
+ * @author Manik Surtani (<a
href="mailto:manik@jboss.org">manik@jboss.org</a>)
+ * @since 2.1.0
+ */
+@Test(groups = "functional")
+public class ObjectStreamPoolTest
+{
+ ObjectStreamPool pool;
+ ReusableObjectOutputStream oos;
+ ObjectInputStream ois;
+
+ @BeforeTest
+ public void setUp() throws IOException, InterruptedException
+ {
+ pool = new ObjectStreamPool(2, 2);
+ }
+
+ @AfterMethod
+ public void afterMethod() throws IOException, InterruptedException
+ {
+ // ensure we return stuff to the pool
+ pool.returnStreamToPool(oos);
+ pool.returnStreamToPool(ois);
+ }
+
+ public void testStreamReset() throws IOException, ClassNotFoundException,
InterruptedException
+ {
+ oos = pool.getOutputStream();
+
+ Object toWrite = "BlahBlah";
+ oos.writeObject(toWrite);
+ byte[] asBytes = oos.getBytes();
+
+ ois = pool.getInputStream(asBytes);
+ assert ois.readObject().equals(toWrite);
+
+ // return to pool
+ pool.returnStreamToPool(oos);
+ pool.returnStreamToPool(ois);
+
+ oos = pool.getOutputStream();
+ toWrite = new Integer(9);
+ oos.writeObject(toWrite);
+ asBytes = oos.getBytes();
+
+ ois = pool.getInputStream(asBytes);
+ assert ois.readObject().equals(toWrite);
+
+ // return to pool
+ pool.returnStreamToPool(oos);
+ pool.returnStreamToPool(ois);
+
+ oos = pool.getOutputStream();
+ toWrite = new Date();
+ oos.writeObject(toWrite);
+ asBytes = oos.getBytes();
+
+ ois = pool.getInputStream(asBytes);
+ assert ois.readObject().equals(toWrite);
+ }
+
+ /**
+ * To test block data header sizes
+ *
+ * @throws IOException
+ * @throws ClassNotFoundException
+ * @throws InterruptedException
+ */
+ public void testLargeStreams() throws IOException, ClassNotFoundException,
InterruptedException
+ {
+ oos = pool.getOutputStream();
+
+ StringBuilder sb = new StringBuilder();
+ Random r = new Random();
+ // 1K string
+ for (int i = 0; i < 0xFFFF; i++) sb.append((char) (r.nextInt(26) + 65));
+
+ Object toWrite = sb.toString();
+ System.out.println("Writing string (length = " + ((String)
toWrite).length() + ") " + toWrite);
+ oos.writeObject(toWrite);
+ byte[] asBytes = oos.getBytes();
+
+ ois = pool.getInputStream(asBytes);
+ assert ois.readObject().equals(toWrite);
+
+ // return to pool
+ pool.returnStreamToPool(oos);
+ pool.returnStreamToPool(ois);
+
+ oos = pool.getOutputStream();
+ toWrite = new Integer(9);
+ for (int i = 0; i < 0xFFFF; i++) oos.writeObject(toWrite);
+ asBytes = oos.getBytes();
+
+ ois = pool.getInputStream(asBytes);
+ for (int i = 0; i < 0xFFFF; i++) assert ois.readObject().equals(toWrite);
+ }
+
+ public void testStreamResetWithFlush() throws Exception
+ {
+ oos = pool.getOutputStream();
+
+ Object toWrite = "BlahBlah", toWrite2 = "Hello world";
+ oos.writeObject(toWrite);
+ oos.flush();
+ oos.writeObject(toWrite2);
+ byte[] asBytes = oos.getBytes();
+
+ ois = pool.getInputStream(asBytes);
+ assert ois.readObject().equals(toWrite);
+ assert ois.readObject().equals(toWrite2);
+
+ // return to pool
+ pool.returnStreamToPool(oos);
+ pool.returnStreamToPool(ois);
+
+ oos = pool.getOutputStream();
+
+ toWrite = new Integer(9);
+ toWrite2 = new Long(99);
+ oos.writeObject(toWrite);
+ oos.flush();
+ oos.writeObject(toWrite2);
+ asBytes = oos.getBytes();
+
+ ois = pool.getInputStream(asBytes);
+ assert ois.readObject().equals(toWrite);
+ assert ois.readObject().equals(toWrite2);
+
+ // return to pool
+ pool.returnStreamToPool(oos);
+ pool.returnStreamToPool(ois);
+
+ oos = pool.getOutputStream();
+
+ toWrite = new Date();
+ toWrite2 = Boolean.FALSE;
+ oos.writeObject(toWrite);
+ oos.flush();
+ oos.writeObject(toWrite2);
+ asBytes = oos.getBytes();
+
+ ois = pool.getInputStream(asBytes);
+ assert ois.readObject().equals(toWrite);
+ assert ois.readObject().equals(toWrite2);
+ }
+}