Author: ShaunA
Date: 2013-01-31 07:23:52 -0500 (Thu, 31 Jan 2013)
New Revision: 6482
Modified:
remoting2/branches/2.5.4.SP3_JBREM-1316/src/main/org/jboss/remoting/transport/socket/MicroSocketClientInvoker.java
Log:
JBPAPP-10610 EAP 5.1.2 Patch to add idempotency to remoting (JBREM-1316)
Modified:
remoting2/branches/2.5.4.SP3_JBREM-1316/src/main/org/jboss/remoting/transport/socket/MicroSocketClientInvoker.java
===================================================================
---
remoting2/branches/2.5.4.SP3_JBREM-1316/src/main/org/jboss/remoting/transport/socket/MicroSocketClientInvoker.java 2013-01-31
08:49:23 UTC (rev 6481)
+++
remoting2/branches/2.5.4.SP3_JBREM-1316/src/main/org/jboss/remoting/transport/socket/MicroSocketClientInvoker.java 2013-01-31
12:23:52 UTC (rev 6482)
@@ -264,6 +264,11 @@
protected boolean generalizeSocketException;
protected int writeTimeout = -1;
+
+ /**
+ * Determines if transport() should attempt retries after versionedWrite() has
succeeded.
+ */
+ protected boolean idempotent;
// Constructors
---------------------------------------------------------------------------------
@@ -286,7 +291,8 @@
pool = null;
maxPoolSize = MAX_POOL_SIZE;
onewayConnectionTimeout = ONEWAY_CONNECTION_TIMEOUT_DEFAULT;
-
+ idempotent = true;
+
try
{
setup();
@@ -421,6 +427,16 @@
this.generalizeSocketException = generalizeSocketException;
}
+ public boolean isIdempotent()
+ {
+ return idempotent;
+ }
+
+ public void setIdempotent(boolean idempotent)
+ {
+ this.idempotent = idempotent;
+ }
+
public synchronized void disconnect()
{
log.debug(this + " disconnecting ...");
@@ -795,8 +811,9 @@
int retryCount = 0;
Exception sockEx = null;
+ boolean versionedWriteComplete = false;
- for (; retryCount < numberOfCallRetries; retryCount++)
+ for (; retryCount < numberOfCallRetries && (idempotent ||
!versionedWriteComplete); retryCount++)
{
if (trace) log.trace(this + " retryCount: " + retryCount);
if (0 < tempTimeout)
@@ -859,7 +876,8 @@
//TODO: -TME so this is messed up as now ties remoting versioning to using a
marshaller type
versionedWrite(outputStream, marshaller, invocation, version);
-
+ versionedWriteComplete = true;
+
if (serverSideOneway)
{
if(trace) { log.trace(this + " sent oneway invocation, so not waiting
for response, returning null"); }
@@ -951,7 +969,7 @@
}
// need to check if ran out of retries
- if (retryCount >= numberOfCallRetries)
+ if (retryCount >= numberOfCallRetries || (response == null &&
!idempotent && versionedWriteComplete))
{
handleException(sockEx, socketWrapper);
}
@@ -1282,9 +1300,7 @@
return null;
}
- // Private
--------------------------------------------------------------------------------------
-
- private Object versionedRead(InputStream inputStream, UnMarshaller unmarshaller, int
version)
+ protected Object versionedRead(InputStream inputStream, UnMarshaller unmarshaller, int
version)
throws IOException, ClassNotFoundException
{
//TODO: -TME - is switch required?
@@ -1308,7 +1324,7 @@
}
}
- private void versionedWrite(OutputStream outputStream, Marshaller marshaller,
+ protected void versionedWrite(OutputStream outputStream, Marshaller marshaller,
Object invocation, int version) throws IOException
{
//TODO: -TME Should I worry about checking the version here? Only one way to do it
at this point
@@ -1336,7 +1352,7 @@
}
//TODO: -TME Exact same method in ServerThread
- private int readVersion(InputStream inputStream) throws IOException
+ protected int readVersion(InputStream inputStream) throws IOException
{
if (trace) { log.trace(this + " reading version from input stream"); }
int version = inputStream.read();
@@ -1345,7 +1361,7 @@
}
//TODO: -TME Exact same method in ServerThread
- private void writeVersion(OutputStream outputStream, int version) throws IOException
+ protected void writeVersion(OutputStream outputStream, int version) throws
IOException
{
if (trace) { log.trace(this + " writing version " + version + " on
output stream"); }
outputStream.write(version);
Show replies by date