JBoss Cache SVN: r6487 - core/trunk/src/main/java/org/jboss/cache/buddyreplication.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2008-08-04 07:19:36 -0400 (Mon, 04 Aug 2008)
New Revision: 6487
Modified:
core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java
Log:
Moved exposedBAOS
Modified: core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java 2008-08-04 10:47:51 UTC (rev 6486)
+++ core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java 2008-08-04 11:19:36 UTC (rev 6487)
@@ -32,13 +32,13 @@
import org.jboss.cache.factories.annotations.Inject;
import org.jboss.cache.factories.annotations.Start;
import org.jboss.cache.factories.annotations.Stop;
+import org.jboss.cache.io.ExposedByteArrayOutputStream;
import org.jboss.cache.lock.TimeoutException;
import org.jboss.cache.notifications.Notifier;
import org.jboss.cache.notifications.annotation.CacheListener;
import org.jboss.cache.notifications.annotation.ViewChanged;
import org.jboss.cache.notifications.event.ViewChangedEvent;
import org.jboss.cache.statetransfer.StateTransferManager;
-import org.jboss.cache.util.ExposedByteArrayOutputStream;
import org.jboss.cache.util.concurrent.ConcurrentHashSet;
import org.jboss.cache.util.reflect.ReflectionUtil;
import org.jboss.util.stream.MarshalledValueInputStream;
15 years, 8 months
JBoss Cache SVN: r6486 - in core/trunk/src: main/java/org/jboss/cache/io and 4 other directories.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2008-08-04 06:47:51 -0400 (Mon, 04 Aug 2008)
New Revision: 6486
Added:
core/trunk/src/main/java/org/jboss/cache/io/
core/trunk/src/main/java/org/jboss/cache/io/ByteBuffer.java
core/trunk/src/main/java/org/jboss/cache/io/ExposedByteArrayOutputStream.java
Removed:
core/trunk/src/main/java/org/jboss/cache/util/ExposedByteArrayOutputStream.java
Modified:
core/trunk/src/main/java/org/jboss/cache/loader/AdjListJDBCCacheLoader.java
core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java
core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java
core/trunk/src/main/java/org/jboss/cache/marshall/Marshaller.java
core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java
core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationListener.java
Log:
Updated to use new JGroups Marshaller2 interface and Buffer class instead of byte arrays. Also created a new subclass of Buffer - ByteBuffer - that exposes a stream on top of the byte buffer. Finally, moveed EBAOS to o.j.c.io instead of o.j.c.util.
Added: core/trunk/src/main/java/org/jboss/cache/io/ByteBuffer.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/io/ByteBuffer.java (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/io/ByteBuffer.java 2008-08-04 10:47:51 UTC (rev 6486)
@@ -0,0 +1,25 @@
+package org.jboss.cache.io;
+
+import org.jgroups.util.Buffer;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+
+/**
+ * A subclass of a JGroups Buffer
+ */
+public class ByteBuffer extends Buffer
+{
+ public ByteBuffer(byte[] bytes, int offset, int length)
+ {
+ super(bytes, offset, length);
+ }
+
+ /**
+ * @return an input stream for the bytes in the buffer
+ */
+ public InputStream getStream()
+ {
+ return new ByteArrayInputStream(getBuf(), getOffset(), getLength());
+ }
+}
Copied: core/trunk/src/main/java/org/jboss/cache/io/ExposedByteArrayOutputStream.java (from rev 6485, core/trunk/src/main/java/org/jboss/cache/util/ExposedByteArrayOutputStream.java)
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/io/ExposedByteArrayOutputStream.java (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/io/ExposedByteArrayOutputStream.java 2008-08-04 10:47:51 UTC (rev 6486)
@@ -0,0 +1,145 @@
+/*
+ * JBoss, the OpenSource J2EE webOS
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+
+package org.jboss.cache.io;
+
+import net.jcip.annotations.NotThreadSafe;
+
+import java.io.ByteArrayOutputStream;
+
+/**
+ * Extends ByteArrayOutputStream, but exposes the internal buffer.
+ * Using this, callers don't need to call toByteArray() which copies the
+ * internal buffer.
+ * <p>
+ * Also overrides the superclass' behavior of always doubling the size of the
+ * internal buffer any time more capacity is needed. This class doubles the
+ * size until the internal buffer reaches a configurable max size (default is
+ * 4MB), after which it begins growing the buffer in 25% increments. This is
+ * intended to help prevent an OutOfMemoryError during a resize of a large
+ * buffer.
+ * </p>
+ * <p>
+ * A version of this class was originally created by Bela Ban as part of the
+ * JGroups library.
+ * </p>
+ * This class is not threadsafe as it will not support concurrent readers and writers.
+ * <p/>
+ *
+ * @author <a href="mailto://brian.stansberry@jboss.com">Brian Stansberry</a>
+ * @version $Id$
+ */
+@NotThreadSafe
+public class ExposedByteArrayOutputStream extends ByteArrayOutputStream
+{
+ /**
+ * Default buffer size after which if more buffer capacity
+ * is needed the buffer will grow by 25% rather than 100%
+ */
+ public static final int DEFAULT_DOUBLING_SIZE = 4 * 1024 * 1024; // 4MB
+
+ private int maxDoublingSize = DEFAULT_DOUBLING_SIZE;
+
+ public ExposedByteArrayOutputStream()
+ {
+ super();
+ }
+
+ public ExposedByteArrayOutputStream(int size)
+ {
+ super(size);
+ }
+
+ /**
+ * Creates a new byte array output stream, with a buffer capacity of
+ * the specified size, in bytes.
+ *
+ * @param size the initial size.
+ * @param maxDoublingSize the buffer size, after which if more capacity
+ * is needed the buffer will grow by 25%
+ * rather than 100%
+ * @throws IllegalArgumentException if size is negative.
+ */
+ public ExposedByteArrayOutputStream(int size, int maxDoublingSize)
+ {
+ super(size);
+ this.maxDoublingSize = maxDoublingSize;
+ }
+
+ /**
+ * Gets the internal buffer array. Note that the length of this array
+ * will almost certainly be longer than the data written to it; call
+ * <code>size()</code> to get the number of bytes of actual data.
+ */
+ public byte[] getRawBuffer()
+ {
+ return buf;
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len)
+ {
+ if ((off < 0) || (off > b.length) || (len < 0) ||
+ ((off + len) > b.length) || ((off + len) < 0))
+ {
+ throw new IndexOutOfBoundsException();
+ }
+ else if (len == 0)
+ {
+ return;
+ }
+
+ int newcount = count + len;
+ if (newcount > buf.length)
+ {
+ byte newbuf[] = new byte[getNewBufferSize(buf.length, newcount)];
+ System.arraycopy(buf, 0, newbuf, 0, count);
+ buf = newbuf;
+ }
+
+ System.arraycopy(b, off, buf, count, len);
+ count = newcount;
+ }
+
+ @Override
+ public void write(int b)
+ {
+ int newcount = count + 1;
+ if (newcount > buf.length)
+ {
+ byte newbuf[] = new byte[getNewBufferSize(buf.length, newcount)];
+ System.arraycopy(buf, 0, newbuf, 0, count);
+ buf = newbuf;
+ }
+ buf[count] = (byte) b;
+ count = newcount;
+ }
+
+ /**
+ * Gets the highest internal buffer size after which if more capacity
+ * is needed the buffer will grow in 25% increments rather than 100%.
+ */
+ public int getMaxDoublingSize()
+ {
+ return maxDoublingSize;
+ }
+
+ /**
+ * Gets the number of bytes to which the internal buffer should be resized.
+ *
+ * @param curSize the current number of bytes
+ * @param minNewSize the minimum number of bytes required
+ * @return the size to which the internal buffer should be resized
+ */
+ public int getNewBufferSize(int curSize, int minNewSize)
+ {
+ if (curSize <= maxDoublingSize)
+ return Math.max(curSize << 1, minNewSize);
+ else
+ return Math.max(curSize + (curSize >> 2), minNewSize);
+ }
+}
Property changes on: core/trunk/src/main/java/org/jboss/cache/io/ExposedByteArrayOutputStream.java
___________________________________________________________________
Name: svn:eol-style
+ native
Modified: core/trunk/src/main/java/org/jboss/cache/loader/AdjListJDBCCacheLoader.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/loader/AdjListJDBCCacheLoader.java 2008-08-02 15:33:59 UTC (rev 6485)
+++ core/trunk/src/main/java/org/jboss/cache/loader/AdjListJDBCCacheLoader.java 2008-08-04 10:47:51 UTC (rev 6486)
@@ -5,10 +5,10 @@
import org.jboss.cache.Fqn;
import org.jboss.cache.Modification;
import org.jboss.cache.config.CacheLoaderConfig;
+import org.jboss.cache.io.ByteBuffer;
import org.jboss.cache.lock.StripedLock;
import org.jboss.cache.util.Util;
-import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
@@ -424,13 +424,8 @@
if (node != null)
{
- // ByteArrayOutputStream baos = new ByteArrayOutputStream();
- // ObjectOutputStream oos = new ObjectOutputStream(baos);
- // oos.writeObject(node);
-
- byte[] byteStream = marshall(node);
- ByteArrayInputStream bais = new ByteArrayInputStream(byteStream);
- ps.setBinaryStream(2, bais, byteStream.length);
+ ByteBuffer byteBuffer = marshall(node);
+ ps.setBinaryStream(2, byteBuffer.getStream(), byteBuffer.getLength());
}
else
{
@@ -499,30 +494,15 @@
con = cf.getConnection();
ps = con.prepareStatement(config.getUpdateNodeSql());
- if (node == null)
- {
- //ps.setNull(1, Types.BLOB);
- // ps.setNull(1, Types.LONGVARBINARY);
- // don't set it to null - simply use an empty hash map.
- node = new HashMap<Object, Object>(0);
- }
+ if (node == null) node = Collections.emptyMap();
- // ByteArrayOutputStream baos = new ByteArrayOutputStream();
- // ObjectOutputStream oos = new ObjectOutputStream(baos);
- // oos.writeObject(node);
+ ByteBuffer byteBuffer = marshall(node);
+ ps.setBinaryStream(1, byteBuffer.getStream(), byteBuffer.getLength());
- byte[] byteStream = marshall(node);
- ByteArrayInputStream bais = new ByteArrayInputStream(byteStream);
- ps.setBinaryStream(1, bais, byteStream.length);
-
ps.setString(2, name.toString());
/*int rows = */
ps.executeUpdate();
- // if (rows != 1)
- // {
- // throw new IllegalStateException("Expected one updated row but got " + rows);
- // }
}
catch (Exception e)
{
@@ -685,9 +665,9 @@
return getMarshaller().objectFromStream(from);
}
- protected byte[] marshall(Object obj) throws Exception
+ protected ByteBuffer marshall(Object obj) throws Exception
{
- return getMarshaller().objectToByteBuffer(obj);
+ return getMarshaller().objectToBuffer(obj);
}
private static String toUpperCase(String s)
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java 2008-08-02 15:33:59 UTC (rev 6485)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java 2008-08-04 10:47:51 UTC (rev 6486)
@@ -35,6 +35,7 @@
import org.jboss.cache.config.Configuration;
import org.jboss.cache.factories.annotations.Inject;
import org.jboss.cache.factories.annotations.Start;
+import org.jboss.cache.io.ByteBuffer;
import org.jboss.cache.transaction.GlobalTransaction;
import org.jgroups.util.Buffer;
@@ -90,10 +91,13 @@
// implement the basic contract set in RPCDispatcher.AbstractMarshaller
public byte[] objectToByteBuffer(Object obj) throws Exception
{
- throw new RuntimeException("Needs to be overridden!");
+ Buffer b = objectToBuffer(obj);
+ byte[] bytes = new byte[b.getLength()];
+ System.arraycopy(b.getBuf(), b.getOffset(), bytes, 0, b.getLength());
+ return bytes;
}
- public Buffer objectToBuffer(Object o) throws Exception
+ public ByteBuffer objectToBuffer(Object o) throws Exception
{
throw new RuntimeException("Needs to be overridden!");
}
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java 2008-08-02 15:33:59 UTC (rev 6485)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java 2008-08-04 10:47:51 UTC (rev 6486)
@@ -70,7 +70,7 @@
* is aware of {@link org.jboss.cache.commands.ReplicableCommand} objects.
*/
public RspList invokeRemoteCommands(Vector<Address> dests, ReplicableCommand command, int mode, long timeout,
- boolean use_anycasting, boolean oob, RspFilter filter) throws NotSerializableException
+ boolean anycasting, boolean oob, RspFilter filter) throws NotSerializableException
{
if (dests != null && dests.isEmpty())
{
@@ -96,7 +96,7 @@
Message msg = new Message();
msg.setBuffer(buf);
if (oob) msg.setFlag(Message.OOB);
- RspList retval = super.castMessage(dests, msg, mode, timeout, use_anycasting, filter);
+ RspList retval = super.castMessage(dests, msg, mode, timeout, anycasting, filter);
if (trace) log.trace("responses: " + retval);
// a null response is 99% likely to be due to a marshalling problem - we throw a NSE, this needs to be changed when
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/Marshaller.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/Marshaller.java 2008-08-02 15:33:59 UTC (rev 6485)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/Marshaller.java 2008-08-04 10:47:51 UTC (rev 6486)
@@ -1,6 +1,7 @@
package org.jboss.cache.marshall;
import org.jboss.cache.Fqn;
+import org.jboss.cache.io.ByteBuffer;
import org.jgroups.blocks.RpcDispatcher;
import java.io.InputStream;
@@ -90,4 +91,14 @@
* @since 2.1.1
*/
RegionalizedMethodCall regionalizedMethodCallFromObjectStream(ObjectInputStream in) throws Exception;
+
+ /**
+ * A specialized form of {@link org.jgroups.blocks.RpcDispatcher.Marshaller2#objectToBuffer(Object)} that returns an instance
+ * of {@link ByteBuffer} instead of {@link org.jgroups.util.Buffer}.
+ *
+ * @param o object to marshall
+ * @return a ByteBuffer
+ * @throws Exception
+ */
+ ByteBuffer objectToBuffer(Object o) throws Exception;
}
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java 2008-08-02 15:33:59 UTC (rev 6485)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java 2008-08-04 10:47:51 UTC (rev 6486)
@@ -12,10 +12,10 @@
import org.jboss.cache.factories.ComponentRegistry;
import org.jboss.cache.factories.annotations.Inject;
import org.jboss.cache.factories.annotations.Start;
-import org.jboss.cache.util.ExposedByteArrayOutputStream;
+import org.jboss.cache.io.ByteBuffer;
+import org.jboss.cache.io.ExposedByteArrayOutputStream;
import org.jboss.cache.util.Util;
import org.jboss.util.stream.MarshalledValueInputStream;
-import org.jgroups.util.Buffer;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
@@ -152,7 +152,7 @@
}
@Override
- public Buffer objectToBuffer(Object obj) throws Exception
+ public ByteBuffer objectToBuffer(Object obj) throws Exception
{
ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(baos);
@@ -165,7 +165,7 @@
out.close();
// and return bytes.
- return new Buffer(baos.toByteArray(), 0, baos.size());
+ return new ByteBuffer(baos.toByteArray(), 0, baos.size());
}
@Override
Deleted: core/trunk/src/main/java/org/jboss/cache/util/ExposedByteArrayOutputStream.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/util/ExposedByteArrayOutputStream.java 2008-08-02 15:33:59 UTC (rev 6485)
+++ core/trunk/src/main/java/org/jboss/cache/util/ExposedByteArrayOutputStream.java 2008-08-04 10:47:51 UTC (rev 6486)
@@ -1,140 +0,0 @@
-/*
- * JBoss, the OpenSource J2EE webOS
- *
- * Distributable under LGPL license.
- * See terms of license at gnu.org.
- */
-
-package org.jboss.cache.util;
-
-import java.io.ByteArrayOutputStream;
-
-/**
- * Extends ByteArrayOutputStream, but exposes the internal buffer.
- * Using this, callers don't need to call toByteArray() which copies the
- * internal buffer.
- * <p>
- * Also overrides the superclass' behavior of always doubling the size of the
- * internal buffer any time more capacity is needed. This class doubles the
- * size until the internal buffer reaches a configurable max size (default is
- * 4MB), after which it begins growing the buffer in 25% increments. This is
- * intended to help prevent an OutOfMemoryError during a resize of a large
- * buffer.
- * </p>
- * <p>
- * A version of this class was originally created by Bela Ban as part of the
- * JGroups library.
- * </p>
- *
- * @author <a href="mailto://brian.stansberry@jboss.com">Brian Stansberry</a>
- * @version $Id$
- */
-public class ExposedByteArrayOutputStream extends ByteArrayOutputStream
-{
- /**
- * Default buffer size after which if more buffer capacity
- * is needed the buffer will grow by 25% rather than 100%
- */
- public static final int DEFAULT_DOUBLING_SIZE = 4 * 1024 * 1024; // 4MB
-
- private int maxDoublingSize = DEFAULT_DOUBLING_SIZE;
-
- public ExposedByteArrayOutputStream()
- {
- super();
- }
-
- public ExposedByteArrayOutputStream(int size)
- {
- super(size);
- }
-
- /**
- * Creates a new byte array output stream, with a buffer capacity of
- * the specified size, in bytes.
- *
- * @param size the initial size.
- * @param maxDoublingSize the buffer size, after which if more capacity
- * is needed the buffer will grow by 25%
- * rather than 100%
- * @throws IllegalArgumentException if size is negative.
- */
- public ExposedByteArrayOutputStream(int size, int maxDoublingSize)
- {
- super(size);
- this.maxDoublingSize = maxDoublingSize;
- }
-
- /**
- * Gets the internal buffer array. Note that the length of this array
- * will almost certainly be longer than the data written to it; call
- * <code>size()</code> to get the number of bytes of actual data.
- */
- public byte[] getRawBuffer()
- {
- return buf;
- }
-
- @Override
- public void write(byte[] b, int off, int len)
- {
- if ((off < 0) || (off > b.length) || (len < 0) ||
- ((off + len) > b.length) || ((off + len) < 0))
- {
- throw new IndexOutOfBoundsException();
- }
- else if (len == 0)
- {
- return;
- }
-
- int newcount = count + len;
- if (newcount > buf.length)
- {
- byte newbuf[] = new byte[getNewBufferSize(buf.length, newcount)];
- System.arraycopy(buf, 0, newbuf, 0, count);
- buf = newbuf;
- }
-
- System.arraycopy(b, off, buf, count, len);
- count = newcount;
- }
-
- @Override
- public void write(int b)
- {
- int newcount = count + 1;
- if (newcount > buf.length)
- {
- byte newbuf[] = new byte[getNewBufferSize(buf.length, newcount)];
- System.arraycopy(buf, 0, newbuf, 0, count);
- buf = newbuf;
- }
- buf[count] = (byte) b;
- count = newcount;
- }
-
- /**
- * Gets the highest internal buffer size after which if more capacity
- * is needed the buffer will grow in 25% increments rather than 100%.
- */
- public int getMaxDoublingSize()
- {
- return maxDoublingSize;
- }
-
- /**
- * Gets the number of bytes to which the internal buffer should be resized.
- *
- * @param curSize the current number of bytes
- * @param minNewSize the minimum number of bytes required
- * @return the size to which the internal buffer should be resized
- */
- public int getNewBufferSize(int curSize, int minNewSize)
- {
- if (curSize <= maxDoublingSize)
- return Math.max(curSize << 1, minNewSize);
- else
- return Math.max(curSize + (curSize >> 2), minNewSize);
- }
-}
Modified: core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationListener.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationListener.java 2008-08-02 15:33:59 UTC (rev 6485)
+++ core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationListener.java 2008-08-04 10:47:51 UTC (rev 6486)
@@ -8,13 +8,14 @@
import org.jboss.cache.commands.tx.PrepareCommand;
import org.jboss.cache.factories.ComponentRegistry;
import org.jboss.cache.invocation.InvocationContext;
+import org.jboss.cache.io.ByteBuffer;
+import org.jboss.cache.marshall.AbstractMarshaller;
import org.jboss.cache.marshall.CommandAwareRpcDispatcher;
import org.jboss.cache.marshall.InactiveRegionAwareRpcDispatcher;
import org.jboss.cache.marshall.Marshaller;
import org.jboss.cache.marshall.RegionalizedMethodCall;
import org.jboss.cache.util.TestingUtil;
import org.jgroups.blocks.RpcDispatcher;
-import org.jgroups.util.Buffer;
import java.io.InputStream;
import java.io.ObjectInputStream;
@@ -149,7 +150,7 @@
/**
* Needed for region based marshalling.
*/
- private class RegionMarshallerDelegate implements Marshaller
+ private class RegionMarshallerDelegate extends AbstractMarshaller
{
private Marshaller realOne;
@@ -194,17 +195,12 @@
return realOne.regionalizedMethodCallFromObjectStream(in);
}
- public byte[] objectToByteBuffer(Object o) throws Exception
- {
- return realOne.objectToByteBuffer(o);
- }
-
public Object objectFromByteBuffer(byte[] bytes) throws Exception
{
return realOne.objectFromByteBuffer(bytes);
}
- public Buffer objectToBuffer(Object o) throws Exception
+ public ByteBuffer objectToBuffer(Object o) throws Exception
{
return realOne.objectToBuffer(o);
}
15 years, 8 months
JBoss Cache SVN: r6485 - core/trunk/src/test/java/org/jboss/cache/util/internals.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2008-08-02 11:33:59 -0400 (Sat, 02 Aug 2008)
New Revision: 6485
Modified:
core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationListener.java
Log:
Updated to use new JGroups Marshaller2 interface and Buffer class instead of byte arrays
Modified: core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationListener.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationListener.java 2008-08-02 15:33:40 UTC (rev 6484)
+++ core/trunk/src/test/java/org/jboss/cache/util/internals/ReplicationListener.java 2008-08-02 15:33:59 UTC (rev 6485)
@@ -14,6 +14,7 @@
import org.jboss.cache.marshall.RegionalizedMethodCall;
import org.jboss.cache.util.TestingUtil;
import org.jgroups.blocks.RpcDispatcher;
+import org.jgroups.util.Buffer;
import java.io.InputStream;
import java.io.ObjectInputStream;
@@ -202,6 +203,16 @@
{
return realOne.objectFromByteBuffer(bytes);
}
+
+ public Buffer objectToBuffer(Object o) throws Exception
+ {
+ return realOne.objectToBuffer(o);
+ }
+
+ public Object objectFromByteBuffer(byte[] bytes, int i, int i1) throws Exception
+ {
+ return realOne.objectFromByteBuffer(bytes, i, i1);
+ }
}
/**
15 years, 8 months
JBoss Cache SVN: r6484 - core/trunk/src/main/java/org/jboss/cache/marshall.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2008-08-02 11:33:40 -0400 (Sat, 02 Aug 2008)
New Revision: 6484
Modified:
core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java
core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java
core/trunk/src/main/java/org/jboss/cache/marshall/Marshaller.java
core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java
Log:
Updated to use new JGroups Marshaller2 interface and Buffer class instead of byte arrays
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java 2008-08-02 15:33:04 UTC (rev 6483)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java 2008-08-02 15:33:40 UTC (rev 6484)
@@ -36,6 +36,7 @@
import org.jboss.cache.factories.annotations.Inject;
import org.jboss.cache.factories.annotations.Start;
import org.jboss.cache.transaction.GlobalTransaction;
+import org.jgroups.util.Buffer;
import java.io.InputStream;
import java.io.ObjectInputStream;
@@ -92,11 +93,21 @@
throw new RuntimeException("Needs to be overridden!");
}
+ public Buffer objectToBuffer(Object o) throws Exception
+ {
+ throw new RuntimeException("Needs to be overridden!");
+ }
+
public Object objectFromByteBuffer(byte[] bytes) throws Exception
{
throw new RuntimeException("Needs to be overridden!");
}
+ public Object objectFromByteBuffer(byte[] bytes, int offset, int len) throws Exception
+ {
+ throw new RuntimeException("Needs to be overridden!");
+ }
+
public Object objectFromStream(InputStream in) throws Exception
{
throw new RuntimeException("Needs to be overridden!");
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java 2008-08-02 15:33:04 UTC (rev 6483)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java 2008-08-02 15:33:40 UTC (rev 6484)
@@ -17,6 +17,7 @@
import org.jgroups.blocks.GroupRequest;
import org.jgroups.blocks.RpcDispatcher;
import org.jgroups.blocks.RspFilter;
+import org.jgroups.util.Buffer;
import org.jgroups.util.Rsp;
import org.jgroups.util.RspList;
@@ -82,19 +83,19 @@
log.trace(new StringBuilder("dests=").append(dests).append(", command=").append(command).
append(", mode=").append(mode).append(", timeout=").append(timeout));
- byte[] buf;
+ Buffer buf;
try
{
- buf = req_marshaller.objectToByteBuffer(command);
+ buf = req_marshaller.objectToBuffer(command);
}
catch (Exception e)
{
- throw new RuntimeException("failure to marshal argument(s)", e);
+ throw new RuntimeException("Failure to marshal argument(s)", e);
}
- Message msg = new Message(null, null, buf);
- if (oob)
- msg.setFlag(Message.OOB);
+ Message msg = new Message();
+ msg.setBuffer(buf);
+ if (oob) msg.setFlag(Message.OOB);
RspList retval = super.castMessage(dests, msg, mode, timeout, use_anycasting, filter);
if (trace) log.trace("responses: " + retval);
@@ -131,7 +132,7 @@
{
try
{
- return executeCommand((ReplicableCommand) req_marshaller.objectFromByteBuffer(req.getBuffer()), req);
+ return executeCommand((ReplicableCommand) req_marshaller.objectFromByteBuffer(req.getBuffer(), req.getOffset(), req.getLength()), req);
}
catch (Throwable x)
{
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/Marshaller.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/Marshaller.java 2008-08-02 15:33:04 UTC (rev 6483)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/Marshaller.java 2008-08-02 15:33:40 UTC (rev 6484)
@@ -31,7 +31,7 @@
* @author <a href="mailto://manik@jboss.org">Manik Surtani</a>
* @since 2.0.0
*/
-public interface Marshaller extends RpcDispatcher.Marshaller
+public interface Marshaller extends RpcDispatcher.Marshaller2
{
/**
* Marshalls an object to a given {@link ObjectOutputStream}
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java 2008-08-02 15:33:04 UTC (rev 6483)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/VersionAwareMarshaller.java 2008-08-02 15:33:40 UTC (rev 6484)
@@ -12,11 +12,12 @@
import org.jboss.cache.factories.ComponentRegistry;
import org.jboss.cache.factories.annotations.Inject;
import org.jboss.cache.factories.annotations.Start;
+import org.jboss.cache.util.ExposedByteArrayOutputStream;
import org.jboss.cache.util.Util;
import org.jboss.util.stream.MarshalledValueInputStream;
+import org.jgroups.util.Buffer;
import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
@@ -151,9 +152,9 @@
}
@Override
- public byte[] objectToByteBuffer(Object obj) throws Exception
+ public Buffer objectToBuffer(Object obj) throws Exception
{
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream();
ObjectOutputStream out = new ObjectOutputStream(baos);
out.writeShort(versionInt);
@@ -164,15 +165,15 @@
out.close();
// and return bytes.
- return baos.toByteArray();
+ return new Buffer(baos.toByteArray(), 0, baos.size());
}
@Override
- public Object objectFromByteBuffer(byte[] buf) throws Exception
+ public Object objectFromByteBuffer(byte[] bytes, int offset, int len) throws Exception
{
Marshaller marshaller;
int versionId;
- ObjectInputStream in = new MarshalledValueInputStream(new ByteArrayInputStream(buf));
+ ObjectInputStream in = new MarshalledValueInputStream(new ByteArrayInputStream(bytes, offset, len));
try
{
15 years, 8 months
JBoss Cache SVN: r6483 - core/trunk/src/main/java/org/jboss/cache/util.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2008-08-02 11:33:04 -0400 (Sat, 02 Aug 2008)
New Revision: 6483
Modified:
core/trunk/src/main/java/org/jboss/cache/util/ExposedByteArrayOutputStream.java
Log:
Removed unnecessary synchronized blocks
Modified: core/trunk/src/main/java/org/jboss/cache/util/ExposedByteArrayOutputStream.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/util/ExposedByteArrayOutputStream.java 2008-08-01 15:28:13 UTC (rev 6482)
+++ core/trunk/src/main/java/org/jboss/cache/util/ExposedByteArrayOutputStream.java 2008-08-02 15:33:04 UTC (rev 6483)
@@ -76,7 +76,7 @@
}
@Override
- public synchronized void write(byte[] b, int off, int len)
+ public void write(byte[] b, int off, int len)
{
if ((off < 0) || (off > b.length) || (len < 0) ||
((off + len) > b.length) || ((off + len) < 0))
@@ -101,7 +101,7 @@
}
@Override
- public synchronized void write(int b)
+ public void write(int b)
{
int newcount = count + 1;
if (newcount > buf.length)
@@ -137,5 +137,4 @@
else
return Math.max(curSize + (curSize >> 2), minNewSize);
}
-
}
15 years, 8 months
JBoss Cache SVN: r6482 - core/trunk.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2008-08-01 11:28:13 -0400 (Fri, 01 Aug 2008)
New Revision: 6482
Modified:
core/trunk/pom.xml
Log:
Updated to JGroups 2.6.4.CR1
Modified: core/trunk/pom.xml
===================================================================
--- core/trunk/pom.xml 2008-08-01 13:19:09 UTC (rev 6481)
+++ core/trunk/pom.xml 2008-08-01 15:28:13 UTC (rev 6482)
@@ -27,7 +27,7 @@
<dependency>
<groupId>jgroups</groupId>
<artifactId>jgroups</artifactId>
- <version>2.6.3.GA</version>
+ <version>2.6.4.CR1</version>
</dependency>
<!-- For the JTA 1.1 API; consuming projects can safely
@@ -404,7 +404,7 @@
<dependency>
<groupId>jgroups</groupId>
<artifactId>jgroups</artifactId>
- <version>2.6.3.GA</version>
+ <version>2.6.4.CR1</version>
</dependency>
<!-- Replaces javax.transaction/jta -->
<dependency>
15 years, 8 months
JBoss Cache SVN: r6481 - core/trunk/src/main/java/org/jboss/cache/cluster.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2008-08-01 09:19:09 -0400 (Fri, 01 Aug 2008)
New Revision: 6481
Modified:
core/trunk/src/main/java/org/jboss/cache/cluster/ReplicationQueue.java
Log:
ReplicationQueue to use a ScheduledExecutor
Modified: core/trunk/src/main/java/org/jboss/cache/cluster/ReplicationQueue.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/cluster/ReplicationQueue.java 2008-08-01 13:12:46 UTC (rev 6480)
+++ core/trunk/src/main/java/org/jboss/cache/cluster/ReplicationQueue.java 2008-08-01 13:19:09 UTC (rev 6481)
@@ -14,8 +14,11 @@
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
-import java.util.Timer;
-import java.util.TimerTask;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* Periodically (or when certain size is exceeded) takes elements and replicates them.
@@ -29,11 +32,6 @@
private static final Log log = LogFactory.getLog(ReplicationQueue.class);
/**
- * We flush every 5 seconds. Inactive if -1 or 0
- */
- private long interval = 5000;
-
- /**
* Max elements before we flush
*/
private long max_elements = 500;
@@ -46,18 +44,13 @@
/**
* For periodical replication
*/
- private Timer timer = null;
-
- /**
- * The timer task, only calls flush() when executed by Timer
- */
- private ReplicationQueue.MyTask task = null;
+ private ScheduledExecutorService scheduledExecutor = null;
private RPCManager rpcManager;
private Configuration configuration;
private boolean enabled;
private CommandsFactory commandsFactory;
+ private static final AtomicInteger counter = new AtomicInteger(0);
-
public boolean isEnabled()
{
return enabled;
@@ -80,7 +73,7 @@
@Start
public synchronized void start()
{
- this.interval = configuration.getReplQueueInterval();
+ long interval = configuration.getReplQueueInterval();
this.max_elements = configuration.getReplQueueMaxElements();
// check again
enabled = configuration.isUseReplQueue() && (configuration.getBuddyReplicationConfig() == null || !configuration.getBuddyReplicationConfig().isEnabled());
@@ -88,14 +81,22 @@
{
if (interval > 0)
{
- if (task == null)
- task = new ReplicationQueue.MyTask();
- if (timer == null)
+ if (scheduledExecutor == null)
{
- timer = new Timer(true);
- timer.schedule(task,
- 500, // delay before initial flush
- interval); // interval between flushes
+ scheduledExecutor = Executors.newSingleThreadScheduledExecutor(new ThreadFactory()
+ {
+ public Thread newThread(Runnable r)
+ {
+ return new Thread(r, "ReplicationQueue-periodicProcessor-" + counter.getAndIncrement());
+ }
+ });
+ scheduledExecutor.scheduleWithFixedDelay(new Runnable()
+ {
+ public void run()
+ {
+ flush();
+ }
+ }, 500l, interval, TimeUnit.MILLISECONDS);
}
}
}
@@ -107,16 +108,11 @@
@Stop
public synchronized void stop()
{
- if (task != null)
+ if (scheduledExecutor != null)
{
- task.cancel();
- task = null;
+ scheduledExecutor.shutdownNow();
}
- if (timer != null)
- {
- timer.cancel();
- timer = null;
- }
+ scheduledExecutor = null;
}
@@ -164,13 +160,4 @@
}
}
}
-
- class MyTask extends TimerTask
- {
- @Override
- public void run()
- {
- flush();
- }
- }
}
\ No newline at end of file
15 years, 8 months