Author: manik.surtani(a)jboss.com
Date: 2008-08-04 06:47:51 -0400 (Mon, 04 Aug 2008)
New Revision: 6486
Added:
core/trunk/src/main/java/org/jboss/cache/io/
core/trunk/src/main/java/org/jboss/cache/io/ByteBuffer.java
core/trunk/src/main/java/org/jboss/cache/io/ExposedByteArrayOutputStream.java
Removed:
core/trunk/src/main/java/org/jboss/cache/util/ExposedByteArrayOutputStream.java
Modified:
core/trunk/src/main/java/org/jboss/cache/loader/AdjListJDBCCacheLoader.java
core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java
core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java
core/trunk/src/main/java/org/jboss/cache/marshall/Marshaller.java
core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java
core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationListener.java
Log:
Updated to use new JGroups Marshaller2 interface and Buffer class instead of byte arrays.
Also created a new subclass of Buffer - ByteBuffer - that exposes a stream on top of the
byte buffer. Finally, moveed EBAOS to o.j.c.io instead of o.j.c.util.
Added: core/trunk/src/main/java/org/jboss/cache/io/ByteBuffer.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/io/ByteBuffer.java
(rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/io/ByteBuffer.java 2008-08-04 10:47:51 UTC
(rev 6486)
@@ -0,0 +1,25 @@
+package org.jboss.cache.io;
+
+import org.jgroups.util.Buffer;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+
+/**
+ * A subclass of a JGroups Buffer
+ */
+public class ByteBuffer extends Buffer
+{
+ public ByteBuffer(byte[] bytes, int offset, int length)
+ {
+ super(bytes, offset, length);
+ }
+
+ /**
+ * @return an input stream for the bytes in the buffer
+ */
+ public InputStream getStream()
+ {
+ return new ByteArrayInputStream(getBuf(), getOffset(), getLength());
+ }
+}
Copied: core/trunk/src/main/java/org/jboss/cache/io/ExposedByteArrayOutputStream.java
(from rev 6485,
core/trunk/src/main/java/org/jboss/cache/util/ExposedByteArrayOutputStream.java)
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/io/ExposedByteArrayOutputStream.java
(rev 0)
+++
core/trunk/src/main/java/org/jboss/cache/io/ExposedByteArrayOutputStream.java 2008-08-04
10:47:51 UTC (rev 6486)
@@ -0,0 +1,145 @@
+/*
+ * JBoss, the OpenSource J2EE webOS
+ *
+ * Distributable under LGPL license.
+ * See terms of license at
gnu.org.
+ */
+
+package org.jboss.cache.io;
+
+import net.jcip.annotations.NotThreadSafe;
+
+import java.io.ByteArrayOutputStream;
+
+/**
+ * Extends ByteArrayOutputStream, but exposes the internal buffer.
+ * Using this, callers don't need to call toByteArray() which copies the
+ * internal buffer.
+ * <p>
+ * Also overrides the superclass' behavior of always doubling the size of the
+ * internal buffer any time more capacity is needed. This class doubles the
+ * size until the internal buffer reaches a configurable max size (default is
+ * 4MB), after which it begins growing the buffer in 25% increments. This is
+ * intended to help prevent an OutOfMemoryError during a resize of a large
+ * buffer.
+ * </p>
+ * <p>
+ * A version of this class was originally created by Bela Ban as part of the
+ * JGroups library.
+ * </p>
+ * This class is not threadsafe as it will not support concurrent readers and writers.
+ * <p/>
+ *
+ * @author <a href="mailto://brian.stansberry@jboss.com">Brian
Stansberry</a>
+ * @version $Id$
+ */
+@NotThreadSafe
+public class ExposedByteArrayOutputStream extends ByteArrayOutputStream
+{
+ /**
+ * Default buffer size after which if more buffer capacity
+ * is needed the buffer will grow by 25% rather than 100%
+ */
+ public static final int DEFAULT_DOUBLING_SIZE = 4 * 1024 * 1024; // 4MB
+
+ private int maxDoublingSize = DEFAULT_DOUBLING_SIZE;
+
+ public ExposedByteArrayOutputStream()
+ {
+ super();
+ }
+
+ public ExposedByteArrayOutputStream(int size)
+ {
+ super(size);
+ }
+
+ /**
+ * Creates a new byte array output stream, with a buffer capacity of
+ * the specified size, in bytes.
+ *
+ * @param size the initial size.
+ * @param maxDoublingSize the buffer size, after which if more capacity
+ * is needed the buffer will grow by 25%
+ * rather than 100%
+ * @throws IllegalArgumentException if size is negative.
+ */
+ public ExposedByteArrayOutputStream(int size, int maxDoublingSize)
+ {
+ super(size);
+ this.maxDoublingSize = maxDoublingSize;
+ }
+
+ /**
+ * Gets the internal buffer array. Note that the length of this array
+ * will almost certainly be longer than the data written to it; call
+ * <code>size()</code> to get the number of bytes of actual data.
+ */
+ public byte[] getRawBuffer()
+ {
+ return buf;
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len)
+ {
+ if ((off < 0) || (off > b.length) || (len < 0) ||
+ ((off + len) > b.length) || ((off + len) < 0))
+ {
+ throw new IndexOutOfBoundsException();
+ }
+ else if (len == 0)
+ {
+ return;
+ }
+
+ int newcount = count + len;
+ if (newcount > buf.length)
+ {
+ byte newbuf[] = new byte[getNewBufferSize(buf.length, newcount)];
+ System.arraycopy(buf, 0, newbuf, 0, count);
+ buf = newbuf;
+ }
+
+ System.arraycopy(b, off, buf, count, len);
+ count = newcount;
+ }
+
+ @Override
+ public void write(int b)
+ {
+ int newcount = count + 1;
+ if (newcount > buf.length)
+ {
+ byte newbuf[] = new byte[getNewBufferSize(buf.length, newcount)];
+ System.arraycopy(buf, 0, newbuf, 0, count);
+ buf = newbuf;
+ }
+ buf[count] = (byte) b;
+ count = newcount;
+ }
+
+ /**
+ * Gets the highest internal buffer size after which if more capacity
+ * is needed the buffer will grow in 25% increments rather than 100%.
+ */
+ public int getMaxDoublingSize()
+ {
+ return maxDoublingSize;
+ }
+
+ /**
+ * Gets the number of bytes to which the internal buffer should be resized.
+ *
+ * @param curSize the current number of bytes
+ * @param minNewSize the minimum number of bytes required
+ * @return the size to which the internal buffer should be resized
+ */
+ public int getNewBufferSize(int curSize, int minNewSize)
+ {
+ if (curSize <= maxDoublingSize)
+ return Math.max(curSize << 1, minNewSize);
+ else
+ return Math.max(curSize + (curSize >> 2), minNewSize);
+ }
+}
Property changes on:
core/trunk/src/main/java/org/jboss/cache/io/ExposedByteArrayOutputStream.java
___________________________________________________________________
Name: svn:eol-style
+ native
Modified: core/trunk/src/main/java/org/jboss/cache/loader/AdjListJDBCCacheLoader.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/loader/AdjListJDBCCacheLoader.java 2008-08-02
15:33:59 UTC (rev 6485)
+++ core/trunk/src/main/java/org/jboss/cache/loader/AdjListJDBCCacheLoader.java 2008-08-04
10:47:51 UTC (rev 6486)
@@ -5,10 +5,10 @@
import org.jboss.cache.Fqn;
import org.jboss.cache.Modification;
import org.jboss.cache.config.CacheLoaderConfig;
+import org.jboss.cache.io.ByteBuffer;
import org.jboss.cache.lock.StripedLock;
import org.jboss.cache.util.Util;
-import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
@@ -424,13 +424,8 @@
if (node != null)
{
- // ByteArrayOutputStream baos = new ByteArrayOutputStream();
- // ObjectOutputStream oos = new ObjectOutputStream(baos);
- // oos.writeObject(node);
-
- byte[] byteStream = marshall(node);
- ByteArrayInputStream bais = new ByteArrayInputStream(byteStream);
- ps.setBinaryStream(2, bais, byteStream.length);
+ ByteBuffer byteBuffer = marshall(node);
+ ps.setBinaryStream(2, byteBuffer.getStream(), byteBuffer.getLength());
}
else
{
@@ -499,30 +494,15 @@
con = cf.getConnection();
ps = con.prepareStatement(config.getUpdateNodeSql());
- if (node == null)
- {
- //ps.setNull(1, Types.BLOB);
- // ps.setNull(1, Types.LONGVARBINARY);
- // don't set it to null - simply use an empty hash map.
- node = new HashMap<Object, Object>(0);
- }
+ if (node == null) node = Collections.emptyMap();
- // ByteArrayOutputStream baos = new ByteArrayOutputStream();
- // ObjectOutputStream oos = new ObjectOutputStream(baos);
- // oos.writeObject(node);
+ ByteBuffer byteBuffer = marshall(node);
+ ps.setBinaryStream(1, byteBuffer.getStream(), byteBuffer.getLength());
- byte[] byteStream = marshall(node);
- ByteArrayInputStream bais = new ByteArrayInputStream(byteStream);
- ps.setBinaryStream(1, bais, byteStream.length);
-
ps.setString(2, name.toString());
/*int rows = */
ps.executeUpdate();
- // if (rows != 1)
- // {
- // throw new IllegalStateException("Expected one updated row but
got " + rows);
- // }
}
catch (Exception e)
{
@@ -685,9 +665,9 @@
return getMarshaller().objectFromStream(from);
}
- protected byte[] marshall(Object obj) throws Exception
+ protected ByteBuffer marshall(Object obj) throws Exception
{
- return getMarshaller().objectToByteBuffer(obj);
+ return getMarshaller().objectToBuffer(obj);
}
private static String toUpperCase(String s)
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java 2008-08-02
15:33:59 UTC (rev 6485)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java 2008-08-04
10:47:51 UTC (rev 6486)
@@ -35,6 +35,7 @@
import org.jboss.cache.config.Configuration;
import org.jboss.cache.factories.annotations.Inject;
import org.jboss.cache.factories.annotations.Start;
+import org.jboss.cache.io.ByteBuffer;
import org.jboss.cache.transaction.GlobalTransaction;
import org.jgroups.util.Buffer;
@@ -90,10 +91,13 @@
// implement the basic contract set in RPCDispatcher.AbstractMarshaller
public byte[] objectToByteBuffer(Object obj) throws Exception
{
- throw new RuntimeException("Needs to be overridden!");
+ Buffer b = objectToBuffer(obj);
+ byte[] bytes = new byte[b.getLength()];
+ System.arraycopy(b.getBuf(), b.getOffset(), bytes, 0, b.getLength());
+ return bytes;
}
- public Buffer objectToBuffer(Object o) throws Exception
+ public ByteBuffer objectToBuffer(Object o) throws Exception
{
throw new RuntimeException("Needs to be overridden!");
}
Modified:
core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java 2008-08-02
15:33:59 UTC (rev 6485)
+++
core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java 2008-08-04
10:47:51 UTC (rev 6486)
@@ -70,7 +70,7 @@
* is aware of {@link org.jboss.cache.commands.ReplicableCommand} objects.
*/
public RspList invokeRemoteCommands(Vector<Address> dests, ReplicableCommand
command, int mode, long timeout,
- boolean use_anycasting, boolean oob, RspFilter
filter) throws NotSerializableException
+ boolean anycasting, boolean oob, RspFilter filter)
throws NotSerializableException
{
if (dests != null && dests.isEmpty())
{
@@ -96,7 +96,7 @@
Message msg = new Message();
msg.setBuffer(buf);
if (oob) msg.setFlag(Message.OOB);
- RspList retval = super.castMessage(dests, msg, mode, timeout, use_anycasting,
filter);
+ RspList retval = super.castMessage(dests, msg, mode, timeout, anycasting, filter);
if (trace) log.trace("responses: " + retval);
// a null response is 99% likely to be due to a marshalling problem - we throw a
NSE, this needs to be changed when
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/Marshaller.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/Marshaller.java 2008-08-02 15:33:59
UTC (rev 6485)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/Marshaller.java 2008-08-04 10:47:51
UTC (rev 6486)
@@ -1,6 +1,7 @@
package org.jboss.cache.marshall;
import org.jboss.cache.Fqn;
+import org.jboss.cache.io.ByteBuffer;
import org.jgroups.blocks.RpcDispatcher;
import java.io.InputStream;
@@ -90,4 +91,14 @@
* @since 2.1.1
*/
RegionalizedMethodCall regionalizedMethodCallFromObjectStream(ObjectInputStream in)
throws Exception;
+
+ /**
+ * A specialized form of {@link
org.jgroups.blocks.RpcDispatcher.Marshaller2#objectToBuffer(Object)} that returns an
instance
+ * of {@link ByteBuffer} instead of {@link org.jgroups.util.Buffer}.
+ *
+ * @param o object to marshall
+ * @return a ByteBuffer
+ * @throws Exception
+ */
+ ByteBuffer objectToBuffer(Object o) throws Exception;
}
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java 2008-08-02
15:33:59 UTC (rev 6485)
+++
core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java 2008-08-04
10:47:51 UTC (rev 6486)
@@ -12,10 +12,10 @@
import org.jboss.cache.factories.ComponentRegistry;
import org.jboss.cache.factories.annotations.Inject;
import org.jboss.cache.factories.annotations.Start;
-import org.jboss.cache.util.ExposedByteArrayOutputStream;
+import org.jboss.cache.io.ByteBuffer;
+import org.jboss.cache.io.ExposedByteArrayOutputStream;
import org.jboss.cache.util.Util;
import org.jboss.util.stream.MarshalledValueInputStream;
-import org.jgroups.util.Buffer;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
@@ -152,7 +152,7 @@
}
@Override
- public Buffer objectToBuffer(Object obj) throws Exception
+ public ByteBuffer objectToBuffer(Object obj) throws Exception
{
ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(baos);
@@ -165,7 +165,7 @@
out.close();
// and return bytes.
- return new Buffer(baos.toByteArray(), 0, baos.size());
+ return new ByteBuffer(baos.toByteArray(), 0, baos.size());
}
@Override
Deleted: core/trunk/src/main/java/org/jboss/cache/util/ExposedByteArrayOutputStream.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/util/ExposedByteArrayOutputStream.java 2008-08-02
15:33:59 UTC (rev 6485)
+++
core/trunk/src/main/java/org/jboss/cache/util/ExposedByteArrayOutputStream.java 2008-08-04
10:47:51 UTC (rev 6486)
@@ -1,140 +0,0 @@
-/*
- * JBoss, the OpenSource J2EE webOS
- *
- * Distributable under LGPL license.
- * See terms of license at
gnu.org.
- */
-
-package org.jboss.cache.util;
-
-import java.io.ByteArrayOutputStream;
-
-/**
- * Extends ByteArrayOutputStream, but exposes the internal buffer.
- * Using this, callers don't need to call toByteArray() which copies the
- * internal buffer.
- * <p>
- * Also overrides the superclass' behavior of always doubling the size of the
- * internal buffer any time more capacity is needed. This class doubles the
- * size until the internal buffer reaches a configurable max size (default is
- * 4MB), after which it begins growing the buffer in 25% increments. This is
- * intended to help prevent an OutOfMemoryError during a resize of a large
- * buffer.
- * </p>
- * <p>
- * A version of this class was originally created by Bela Ban as part of the
- * JGroups library.
- * </p>
- *
- * @author <a href="mailto://brian.stansberry@jboss.com">Brian
Stansberry</a>
- * @version $Id$
- */
-public class ExposedByteArrayOutputStream extends ByteArrayOutputStream
-{
- /**
- * Default buffer size after which if more buffer capacity
- * is needed the buffer will grow by 25% rather than 100%
- */
- public static final int DEFAULT_DOUBLING_SIZE = 4 * 1024 * 1024; // 4MB
-
- private int maxDoublingSize = DEFAULT_DOUBLING_SIZE;
-
- public ExposedByteArrayOutputStream()
- {
- super();
- }
-
- public ExposedByteArrayOutputStream(int size)
- {
- super(size);
- }
-
- /**
- * Creates a new byte array output stream, with a buffer capacity of
- * the specified size, in bytes.
- *
- * @param size the initial size.
- * @param maxDoublingSize the buffer size, after which if more capacity
- * is needed the buffer will grow by 25%
- * rather than 100%
- * @throws IllegalArgumentException if size is negative.
- */
- public ExposedByteArrayOutputStream(int size, int maxDoublingSize)
- {
- super(size);
- this.maxDoublingSize = maxDoublingSize;
- }
-
- /**
- * Gets the internal buffer array. Note that the length of this array
- * will almost certainly be longer than the data written to it; call
- * <code>size()</code> to get the number of bytes of actual data.
- */
- public byte[] getRawBuffer()
- {
- return buf;
- }
-
- @Override
- public void write(byte[] b, int off, int len)
- {
- if ((off < 0) || (off > b.length) || (len < 0) ||
- ((off + len) > b.length) || ((off + len) < 0))
- {
- throw new IndexOutOfBoundsException();
- }
- else if (len == 0)
- {
- return;
- }
-
- int newcount = count + len;
- if (newcount > buf.length)
- {
- byte newbuf[] = new byte[getNewBufferSize(buf.length, newcount)];
- System.arraycopy(buf, 0, newbuf, 0, count);
- buf = newbuf;
- }
-
- System.arraycopy(b, off, buf, count, len);
- count = newcount;
- }
-
- @Override
- public void write(int b)
- {
- int newcount = count + 1;
- if (newcount > buf.length)
- {
- byte newbuf[] = new byte[getNewBufferSize(buf.length, newcount)];
- System.arraycopy(buf, 0, newbuf, 0, count);
- buf = newbuf;
- }
- buf[count] = (byte) b;
- count = newcount;
- }
-
- /**
- * Gets the highest internal buffer size after which if more capacity
- * is needed the buffer will grow in 25% increments rather than 100%.
- */
- public int getMaxDoublingSize()
- {
- return maxDoublingSize;
- }
-
- /**
- * Gets the number of bytes to which the internal buffer should be resized.
- *
- * @param curSize the current number of bytes
- * @param minNewSize the minimum number of bytes required
- * @return the size to which the internal buffer should be resized
- */
- public int getNewBufferSize(int curSize, int minNewSize)
- {
- if (curSize <= maxDoublingSize)
- return Math.max(curSize << 1, minNewSize);
- else
- return Math.max(curSize + (curSize >> 2), minNewSize);
- }
-}
Modified:
core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationListener.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationListener.java 2008-08-02
15:33:59 UTC (rev 6485)
+++
core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationListener.java 2008-08-04
10:47:51 UTC (rev 6486)
@@ -8,13 +8,14 @@
import org.jboss.cache.commands.tx.PrepareCommand;
import org.jboss.cache.factories.ComponentRegistry;
import org.jboss.cache.invocation.InvocationContext;
+import org.jboss.cache.io.ByteBuffer;
+import org.jboss.cache.marshall.AbstractMarshaller;
import org.jboss.cache.marshall.CommandAwareRpcDispatcher;
import org.jboss.cache.marshall.InactiveRegionAwareRpcDispatcher;
import org.jboss.cache.marshall.Marshaller;
import org.jboss.cache.marshall.RegionalizedMethodCall;
import org.jboss.cache.util.TestingUtil;
import org.jgroups.blocks.RpcDispatcher;
-import org.jgroups.util.Buffer;
import java.io.InputStream;
import java.io.ObjectInputStream;
@@ -149,7 +150,7 @@
/**
* Needed for region based marshalling.
*/
- private class RegionMarshallerDelegate implements Marshaller
+ private class RegionMarshallerDelegate extends AbstractMarshaller
{
private Marshaller realOne;
@@ -194,17 +195,12 @@
return realOne.regionalizedMethodCallFromObjectStream(in);
}
- public byte[] objectToByteBuffer(Object o) throws Exception
- {
- return realOne.objectToByteBuffer(o);
- }
-
public Object objectFromByteBuffer(byte[] bytes) throws Exception
{
return realOne.objectFromByteBuffer(bytes);
}
- public Buffer objectToBuffer(Object o) throws Exception
+ public ByteBuffer objectToBuffer(Object o) throws Exception
{
return realOne.objectToBuffer(o);
}