Author: richard.opalka(a)jboss.com
Date: 2008-02-26 12:17:06 -0500 (Tue, 26 Feb 2008)
New Revision: 5814
Added:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/server/RMStore.java
Modified:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/common/RMHelper.java
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/server/RMDeploymentAspect.java
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/server/RMInvocationHandler.java
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/server/RMServerSequence.java
Log:
[JBWS-1828] [JBWS-1830] first round
Modified:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/common/RMHelper.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/common/RMHelper.java 2008-02-26
17:09:46 UTC (rev 5813)
+++
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/common/RMHelper.java 2008-02-26
17:17:06 UTC (rev 5814)
@@ -37,7 +37,6 @@
import org.jboss.ws.extensions.wsrm.RMConstant;
import org.jboss.ws.extensions.wsrm.RMClientSequence;
import org.jboss.ws.extensions.wsrm.api.RMException;
-import org.jboss.ws.extensions.wsrm.server.RMServerSequence;
import org.jboss.ws.extensions.wsrm.spi.RMConstants;
import org.jboss.ws.extensions.wsrm.spi.RMProvider;
import org.jboss.ws.extensions.wsrm.spi.protocol.RMAckRequested;
@@ -78,32 +77,6 @@
}
}
- public static RMServerSequence getServerSequenceByInboundId(String seqId,
List<RMServerSequence> sequences)
- {
- for (RMServerSequence seq : sequences)
- {
- if (seq.getInboundId().equals(seqId))
- {
- return seq;
- }
- }
-
- return null;
- }
-
- public static RMServerSequence getServerSequenceByOutboundId(String seqId,
List<RMServerSequence> sequences)
- {
- for (RMServerSequence seq : sequences)
- {
- if (seq.getOutboundId().equals(seqId))
- {
- return seq;
- }
- }
-
- return null;
- }
-
public static boolean isCreateSequence(Map<String, Object> rmMsgContext)
{
List<QName> protocolMessages =
(List<QName>)rmMsgContext.get(RMConstant.PROTOCOL_MESSAGES);
Modified:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/server/RMDeploymentAspect.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/server/RMDeploymentAspect.java 2008-02-26
17:09:46 UTC (rev 5813)
+++
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/server/RMDeploymentAspect.java 2008-02-26
17:17:06 UTC (rev 5814)
@@ -21,10 +21,9 @@
*/
package org.jboss.ws.extensions.wsrm.server;
-import java.util.LinkedList;
-
import org.jboss.ws.extensions.wsrm.common.RMHelper;
import org.jboss.ws.metadata.umdm.ServerEndpointMetaData;
+import org.jboss.wsf.spi.deployment.ArchiveDeployment;
import org.jboss.wsf.spi.deployment.Deployment;
import org.jboss.wsf.spi.deployment.DeploymentAspect;
import org.jboss.wsf.spi.deployment.Endpoint;
@@ -52,9 +51,9 @@
if (sepMetaData.getConfig().getRMMetaData() != null)
{
InvocationHandler origInvHandler = ep.getInvocationHandler();
- InvocationHandler wsrmInvHandler = new RMInvocationHandler(origInvHandler);
+ InvocationHandler wsrmInvHandler = new RMInvocationHandler(origInvHandler,
(ArchiveDeployment)dep);
+ // TODO: implement wsrm data dir clean up here
ep.setInvocationHandler(wsrmInvHandler);
- ep.addAttachment(RMServerSequence.class, new
LinkedList<RMServerSequence>());
RMHelper.setupRMOperations(sepMetaData);
log.info("WS-RM invocation handler associated with endpoint " +
ep.getAddress());
}
@@ -71,7 +70,6 @@
{
RMInvocationHandler rmInvHandler = (RMInvocationHandler)invHandler;
ep.setInvocationHandler(rmInvHandler.getDelegate());
- ep.removeAttachment(RMServerSequence.class);
log.info("WS-RM invocation handler removed from endpoint " +
ep.getAddress());
}
}
Modified:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/server/RMInvocationHandler.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/server/RMInvocationHandler.java 2008-02-26
17:09:46 UTC (rev 5813)
+++
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/server/RMInvocationHandler.java 2008-02-26
17:17:06 UTC (rev 5814)
@@ -21,6 +21,7 @@
*/
package org.jboss.ws.extensions.wsrm.server;
+import java.io.IOException;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.LinkedList;
@@ -47,9 +48,14 @@
import org.jboss.ws.extensions.wsrm.spi.protocol.RMSequenceAcknowledgement;
import org.jboss.ws.extensions.wsrm.spi.protocol.RMSerializable;
import org.jboss.ws.extensions.wsrm.spi.protocol.RMTerminateSequence;
+import org.jboss.wsf.spi.SPIProvider;
+import org.jboss.wsf.spi.SPIProviderResolver;
+import org.jboss.wsf.spi.deployment.ArchiveDeployment;
import org.jboss.wsf.spi.deployment.Endpoint;
import org.jboss.wsf.spi.invocation.Invocation;
import org.jboss.wsf.spi.invocation.InvocationHandler;
+import org.jboss.wsf.spi.management.ServerConfig;
+import org.jboss.wsf.spi.management.ServerConfigFactory;
/**
* RM Invocation Handler
@@ -63,14 +69,33 @@
private static final Logger logger = Logger.getLogger(RMInvocationHandler.class);
private static final RMConstants rmConstants = RMProvider.get().getConstants();
-
+ private ServerConfig serverConfig;
private final InvocationHandler delegate;
+ private final ArchiveDeployment dep;
+ private final String dataDir;
- RMInvocationHandler(InvocationHandler delegate)
+ RMInvocationHandler(InvocationHandler delegate, ArchiveDeployment dep)
{
+ SPIProvider spiProvider = SPIProviderResolver.getInstance().getProvider();
+ this.serverConfig =
spiProvider.getSPI(ServerConfigFactory.class).getServerConfig();
this.delegate = delegate;
+ this.dep = dep;
+ this.dataDir = getPersistLocation();
}
+ private String getPersistLocation()
+ {
+ try
+ {
+ String deploymentDir = (dep.getParent() != null ?
dep.getParent().getSimpleName() : dep.getSimpleName());
+ return serverConfig.getServerDataDir().getCanonicalPath() + "/wsrm/" +
deploymentDir;
+ }
+ catch (IOException ioe)
+ {
+ throw new IllegalStateException();
+ }
+ }
+
@Override
public final Invocation createInvocation()
{
@@ -96,7 +121,7 @@
* @param inv invocation
* @return RM response context to be set after target endpoint invocation
*/
- private Map<String, Object> prepareResponseContext(Endpoint ep, Invocation inv)
+ private synchronized Map<String, Object> prepareResponseContext(Endpoint ep,
Invocation inv)
{
CommonMessageContext msgContext = MessageContextAssociation.peekMessageContext();
AddressingProperties addrProps =
(AddressingProperties)msgContext.get(JAXWSAConstants.SERVER_ADDRESSING_PROPERTIES_INBOUND);
@@ -110,7 +135,6 @@
List<QName> protocolMessages = new LinkedList<QName>();
Map<String, Object> rmResponseContext = new HashMap<String, Object>();
- List<RMServerSequence> sequences =
(List<RMServerSequence>)ep.getAttachment(RMServerSequence.class);
rmResponseContext.put(RMConstant.PROTOCOL_MESSAGES, protocolMessages);
msgContext.remove(RMConstant.RESPONSE_CONTEXT);
RMServerSequence sequence = null;
@@ -119,7 +143,7 @@
if (RMHelper.isCreateSequence(rmReqProps))
{
sequence = new RMServerSequence();
- sequences.add(sequence);
+ RMStore.serialize(dataDir, sequence);
protocolMessages.add(rmConstants.getCreateSequenceResponseQName());
rmResponseContext.put(RMConstant.SEQUENCE_REFERENCE, sequence);
isOneWayOperation = false;
@@ -130,13 +154,15 @@
Map<QName, RMSerializable> data = (Map<QName,
RMSerializable>)rmReqProps.get(RMConstant.PROTOCOL_MESSAGES_MAPPING);
RMCloseSequence payload =
(RMCloseSequence)data.get(rmConstants.getCloseSequenceQName());
String seqIdentifier = payload.getIdentifier();
- sequence = RMHelper.getServerSequenceByInboundId(seqIdentifier, sequences);
+ sequence = RMStore.deserialize(dataDir, seqIdentifier, true);
+ //sequence = RMHelper.getServerSequenceByInboundId(seqIdentifier, sequences);
if (sequence == null)
{
throw new NotImplementedException("TODO: implement unknown sequence
fault" + seqIdentifier);
}
sequence.close();
+ RMStore.serialize(dataDir, sequence);
protocolMessages.add(rmConstants.getCloseSequenceResponseQName());
protocolMessages.add(rmConstants.getSequenceAcknowledgementQName());
rmResponseContext.put(RMConstant.SEQUENCE_REFERENCE, sequence);
@@ -148,7 +174,8 @@
Map<QName, RMSerializable> data = (Map<QName,
RMSerializable>)rmReqProps.get(RMConstant.PROTOCOL_MESSAGES_MAPPING);
RMSequenceAcknowledgement payload =
(RMSequenceAcknowledgement)data.get(rmConstants.getSequenceAcknowledgementQName());
String seqIdentifier = payload.getIdentifier();
- sequence = RMHelper.getServerSequenceByOutboundId(seqIdentifier, sequences);
+ sequence = RMStore.deserialize(dataDir, seqIdentifier, false);
+ //sequence = RMHelper.getServerSequenceByOutboundId(seqIdentifier, sequences);
if (sequence == null)
{
throw new NotImplementedException("TODO: implement unknown sequence
fault" + seqIdentifier);
@@ -161,6 +188,8 @@
sequence.addReceivedOutboundMessage(i);
}
}
+
+ RMStore.serialize(dataDir, sequence);
}
if (RMHelper.isTerminateSequence(rmReqProps))
@@ -168,13 +197,15 @@
Map<QName, RMSerializable> data = (Map<QName,
RMSerializable>)rmReqProps.get(RMConstant.PROTOCOL_MESSAGES_MAPPING);
RMTerminateSequence payload =
(RMTerminateSequence)data.get(rmConstants.getTerminateSequenceQName());
String seqIdentifier = payload.getIdentifier();
- sequence = RMHelper.getServerSequenceByInboundId(seqIdentifier, sequences);
+ sequence = RMStore.deserialize(dataDir, seqIdentifier, true);
+ //sequence = RMHelper.getServerSequenceByInboundId(seqIdentifier, sequences);
if (sequence == null)
{
throw new NotImplementedException("TODO: implement unknown sequence
fault" + seqIdentifier);
}
- sequences.remove(sequence);
+ RMStore.serialize(dataDir, sequence); // TODO: serialization of terminated
sequence results in no file
+ //sequences.remove(sequence);
if (RMProvider.get().getMessageFactory().newTerminateSequenceResponse() !=
null)
{
protocolMessages.add(rmConstants.getTerminateSequenceResponseQName());
@@ -184,7 +215,7 @@
}
else
{
- return null; // no WS-RM context propagated
+ return null; // no WS-RM context propagated - WS-RM 1.0
}
}
@@ -193,13 +224,15 @@
Map<QName, RMSerializable> data = (Map<QName,
RMSerializable>)rmReqProps.get(RMConstant.PROTOCOL_MESSAGES_MAPPING);
RMSequence payload = (RMSequence)data.get(rmConstants.getSequenceQName());
String seqIdentifier = payload.getIdentifier();
- sequence = RMHelper.getServerSequenceByInboundId(seqIdentifier, sequences);
+ sequence = RMStore.deserialize(dataDir, seqIdentifier, true);
+ //sequence = RMHelper.getServerSequenceByInboundId(seqIdentifier, sequences);
if (sequence == null)
{
throw new NotImplementedException("TODO: implement unknown sequence
fault" + seqIdentifier);
}
sequence.addReceivedInboundMessage(payload.getMessageNumber());
+ RMStore.serialize(dataDir, sequence);
protocolMessages.add(rmConstants.getSequenceAcknowledgementQName());
rmResponseContext.put(RMConstant.SEQUENCE_REFERENCE, sequence);
Modified:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/server/RMServerSequence.java
===================================================================
---
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/server/RMServerSequence.java 2008-02-26
17:09:46 UTC (rev 5813)
+++
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/server/RMServerSequence.java 2008-02-26
17:17:06 UTC (rev 5814)
@@ -21,12 +21,18 @@
*/
package org.jboss.ws.extensions.wsrm.server;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.Iterator;
import java.util.Set;
import java.util.TreeSet;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
import org.jboss.logging.Logger;
+import org.jboss.util.NotImplementedException;
import org.jboss.ws.extensions.addressing.AddressingClientUtil;
import org.jboss.ws.extensions.wsrm.RMSequence;
@@ -41,15 +47,63 @@
{
private static final Logger logger = Logger.getLogger(RMServerSequence.class);
- private final String inboundId = AddressingClientUtil.generateMessageID().toString();
- private final String outboundId =
AddressingClientUtil.generateMessageID().toString();
- private final long duration = 10 * 60 * 1000L; // 10 minutes duration
- private final Set<Long> acknowledgedOutboundMessages = new
TreeSet<Long>();
- private final Set<Long> receivedInboundMessages = new TreeSet<Long>();
+ private String sequenceId;
+ private String inboundId;
+ private String outboundId;
+ private long creationTime;
+ private long duration;
+ private Set<Long> acknowledgedOutboundMessages = new TreeSet<Long>();
+ private Set<Long> receivedInboundMessages = new TreeSet<Long>();
private boolean closed;
- private AtomicBoolean inboundMessageAckRequested = new AtomicBoolean();
- private AtomicLong messageNumber = new AtomicLong();
+ private boolean terminated;
+ private boolean inboundMessageAckRequested;
+ private long messageNumber;
+ public RMServerSequence(File serializedSequence) throws IOException
+ {
+ ObjectInputStream ois = null;
+ try
+ {
+ ois = new ObjectInputStream(new FileInputStream(serializedSequence));
+ this.sequenceId = serializedSequence.getName();
+ this.inboundId = ois.readUTF();
+ this.outboundId = ois.readUTF();
+ this.closed = ois.readBoolean();
+ this.terminated = ois.readBoolean();
+ this.creationTime = ois.readLong();
+ this.duration = ois.readLong();
+ this.messageNumber = ois.readLong();
+ int countOfOutboundMessages = ois.readInt();
+ for (int i = 0; i < countOfOutboundMessages; i++)
+ {
+ this.acknowledgedOutboundMessages.add(ois.readLong());
+ }
+ int countOfInboundMessages = ois.readInt();
+ for (int i = 0; i < countOfInboundMessages; i++)
+ {
+ this.receivedInboundMessages.add(ois.readLong());
+ }
+ }
+ finally
+ {
+ ois.close();
+ }
+ }
+
+ public RMServerSequence()
+ {
+ this.sequenceId = "seq-" + System.currentTimeMillis() + "-" +
System.identityHashCode(this);
+ this.inboundId = AddressingClientUtil.generateMessageID().toString();
+ this.outboundId = AddressingClientUtil.generateMessageID().toString();
+ this.creationTime = System.currentTimeMillis();
+ this.duration = 10 * 60 * 1000L; // 10 minutes duration
+ }
+
+ public String getId()
+ {
+ return this.sequenceId;
+ }
+
public String getInboundId()
{
return this.inboundId;
@@ -69,6 +123,16 @@
{
return this.outboundId;
}
+
+ public void serialize()
+ {
+ throw new NotImplementedException();
+ }
+
+ public void deserialize()
+ {
+ throw new NotImplementedException();
+ }
public final void addReceivedInboundMessage(long messageId)
{
@@ -84,25 +148,25 @@
public final void ackRequested(boolean requested)
{
- this.inboundMessageAckRequested.set(requested);
+ this.inboundMessageAckRequested = requested;
logger.debug("Inbound Sequence: " + this.inboundId + ", ack
requested. Messages in the queue: " + this.receivedInboundMessages);
}
public final long newMessageNumber()
{
// no need for synchronization
- return this.messageNumber.incrementAndGet();
+ return ++this.messageNumber;
}
public final long getLastMessageNumber()
{
// no need for synchronization
- return this.messageNumber.get();
+ return this.messageNumber;
}
public final boolean isAckRequested()
{
- return this.inboundMessageAckRequested.get();
+ return this.inboundMessageAckRequested;
}
public Set<Long> getReceivedInboundMessages()
@@ -115,6 +179,36 @@
this.closed = true;
}
+ public void terminate()
+ {
+ this.terminated = true;
+ }
+
+ public byte[] toByteArray() throws IOException
+ {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeUTF(this.inboundId);
+ oos.writeUTF(this.outboundId);
+ oos.writeBoolean(this.closed);
+ oos.writeBoolean(this.terminated);
+ oos.writeLong(this.creationTime);
+ oos.writeLong(this.duration);
+ oos.writeLong(this.messageNumber);
+ oos.writeInt(this.acknowledgedOutboundMessages.size());
+ for (Iterator<Long> i = this.acknowledgedOutboundMessages.iterator();
i.hasNext(); )
+ {
+ oos.writeLong(i.next());
+ }
+ oos.writeInt(this.receivedInboundMessages.size());
+ for (Iterator<Long> i = this.receivedInboundMessages.iterator(); i.hasNext();
)
+ {
+ oos.writeLong(i.next());
+ }
+ oos.close();
+ return baos.toByteArray();
+ }
+
public String toString()
{
return this.inboundId + " - " + this.outboundId;
Added: stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/server/RMStore.java
===================================================================
--- stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/server/RMStore.java
(rev 0)
+++
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/server/RMStore.java 2008-02-26
17:17:06 UTC (rev 5814)
@@ -0,0 +1,94 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005, JBoss Inc., and individual contributors as indicated
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.ws.extensions.wsrm.server;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.jboss.logging.Logger;
+
+/**
+ * Server side RM store that de/serializes sequences
+ *
+ * @author richard.opalka(a)jboss.com
+ */
+public final class RMStore
+{
+
+ private static final Logger logger = Logger.getLogger(RMStore.class);
+
+ public static final void serialize(String dataDir, RMServerSequence seq)
+ {
+ File dir = new File(dataDir);
+ dir.mkdirs();
+ File file = new File(dir, seq.getId());
+ FileOutputStream fos = null;
+ try
+ {
+ fos = new FileOutputStream(file);
+ fos.write(seq.toByteArray());
+ }
+ catch (IOException ioe)
+ {
+ logger.error(ioe.getMessage(), ioe);
+ }
+ finally
+ {
+ if (fos != null)
+ {
+ try
+ {
+ fos.close();
+ }
+ catch (IOException ignore)
+ {
+ // do nothing
+ }
+ }
+ }
+ }
+
+ public static final RMServerSequence deserialize(String dataDir, String seqId, boolean
inbound)
+ {
+ File[] sequences = new File(dataDir).listFiles();
+ for (int i = 0; i < sequences.length; i++)
+ {
+ try
+ {
+ RMServerSequence sequence = new RMServerSequence(sequences[i]);
+ boolean matches = inbound ? sequence.getInboundId().equals(seqId) :
sequence.getOutboundId().equals(seqId);
+ if (matches)
+ {
+ return sequence;
+ }
+ }
+ catch (IOException ioe)
+ {
+ logger.error(ioe.getMessage(), ioe);
+ }
+ }
+
+ return null;
+ }
+
+}
Property changes on:
stack/native/trunk/src/main/java/org/jboss/ws/extensions/wsrm/server/RMStore.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF