[jboss-svn-commits] JBL Code SVN: r35938 - in labs/jbossrules/trunk/drools-grid/drools-grid-impl/src: main/java/org/drools/grid/remote/command and 1 other directories.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Wed Nov 10 11:41:48 EST 2010
Author: salaboy21
Date: 2010-11-10 11:41:47 -0500 (Wed, 10 Nov 2010)
New Revision: 35938
Added:
labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/ConversationUtil.java
labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/WorkingMemoryEntryPointRemoteClient.java
labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/command/
labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/command/GetWorkingMemoryEntryPointRemoteCommand.java
Modified:
labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/GridNodeRemoteClient.java
labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/KnowledgeBaseProviderRemoteClient.java
labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/KnowledgeBaseRemoteClient.java
labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/KnowledgeBuilderProviderRemoteClient.java
labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/KnowledgeBuilderRemoteClient.java
labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/StatefulKnowledgeSessionRemoteClient.java
labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/test/java/org/drools/grid/NodeTests.java
Log:
JBRULES-2772: Drools Grid Impl2 remoting features (socket)
- NodeTests, Add ConversationUtil with send message
- Adding more remoting implementations
Added: labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/ConversationUtil.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/ConversationUtil.java (rev 0)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/ConversationUtil.java 2010-11-10 16:41:47 UTC (rev 35938)
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2010 salaboy.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * under the License.
+ */
+
+package org.drools.grid.remote;
+
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+import org.drools.grid.internal.responsehandlers.BlockingMessageResponseHandler;
+import org.drools.grid.io.Conversation;
+import org.drools.grid.io.ConversationManager;
+
+/**
+ *
+ * @author salaboy
+ */
+public class ConversationUtil {
+ public static Object sendMessage(ConversationManager conversationManager,
+ Serializable addr,
+ String id,
+ Object body) {
+
+ InetSocketAddress[] sockets = null;
+ if ( addr instanceof InetSocketAddress[] ) {
+ sockets = (InetSocketAddress[]) addr;
+ } else if ( addr instanceof InetSocketAddress ) {
+ sockets = new InetSocketAddress[ 1 ];
+ sockets[0] = (InetSocketAddress) addr;
+ }
+
+ BlockingMessageResponseHandler handler = new BlockingMessageResponseHandler();
+ Exception exception = null;
+ for ( InetSocketAddress socket : sockets ) {
+ try {
+ Conversation conv = conversationManager.startConversation( socket,
+ id );
+ conv.sendMessage( body,
+ handler );
+ exception = null;
+ } catch ( Exception e ) {
+ exception = e;
+ conversationManager.endConversation();
+ }
+ if ( exception == null ) {
+ break;
+ }
+ }
+ if ( exception != null ) {
+ throw new RuntimeException( "Unable to send message",
+ exception );
+ }
+ try {
+ return handler.getMessage().getBody();
+ } finally {
+ conversationManager.endConversation();
+ }
+ }
+}
Modified: labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/GridNodeRemoteClient.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/GridNodeRemoteClient.java 2010-11-10 14:58:10 UTC (rev 35937)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/GridNodeRemoteClient.java 2010-11-10 16:41:47 UTC (rev 35938)
@@ -41,7 +41,8 @@
private GridServiceDescription gsd;
private final Map<String, Object> localContext = new ConcurrentHashMap<String, Object>();
private final ServiceRegistry serviceRegistry = ServiceRegistryImpl.getInstance();
-
+ private MinaConnector connector = new MinaConnector();
+
public GridNodeRemoteClient(GridServiceDescription gsd) {
this.gsd = gsd;
init( this.localContext );
@@ -73,7 +74,7 @@
public void init(Object context) {
- MinaConnector connector = new MinaConnector();
+
ConversationManager cm = new ConversationManagerImpl( this.gsd.getId(),
connector,
SystemEventListenerFactory.getSystemEventListener() );
@@ -87,7 +88,7 @@
}
public void dispose() {
- throw new UnsupportedOperationException( "Not supported yet." );
+ connector.close();
}
}
Modified: labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/KnowledgeBaseProviderRemoteClient.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/KnowledgeBaseProviderRemoteClient.java 2010-11-10 14:58:10 UTC (rev 35937)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/KnowledgeBaseProviderRemoteClient.java 2010-11-10 16:41:47 UTC (rev 35938)
@@ -78,7 +78,7 @@
localId,
new NewKnowledgeBaseCommand( null ) ) } ) );
- sendMessage( this.cm,
+ ConversationUtil.sendMessage( this.cm,
(InetSocketAddress) this.gsd.getAddresses().get( "socket" ).getObject(),
this.gsd.getId(),
cmd );
@@ -106,44 +106,5 @@
throw new UnsupportedOperationException( "Not supported yet." );
}
- public static Object sendMessage(ConversationManager conversationManager,
- Serializable addr,
- String id,
- Object body) {
-
- InetSocketAddress[] sockets = null;
- if ( addr instanceof InetSocketAddress[] ) {
- sockets = (InetSocketAddress[]) addr;
- } else if ( addr instanceof InetSocketAddress ) {
- sockets = new InetSocketAddress[ 1 ];
- sockets[0] = (InetSocketAddress) addr;
- }
-
- BlockingMessageResponseHandler handler = new BlockingMessageResponseHandler();
- Exception exception = null;
- for ( InetSocketAddress socket : sockets ) {
- try {
- Conversation conv = conversationManager.startConversation( socket,
- id );
- conv.sendMessage( body,
- handler );
- exception = null;
- } catch ( Exception e ) {
- exception = e;
- conversationManager.endConversation();
- }
- if ( exception == null ) {
- break;
- }
- }
- if ( exception != null ) {
- throw new RuntimeException( "Unable to send message",
- exception );
- }
- try {
- return handler.getMessage().getBody();
- } finally {
- conversationManager.endConversation();
- }
- }
+
}
Modified: labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/KnowledgeBaseRemoteClient.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/KnowledgeBaseRemoteClient.java 2010-11-10 14:58:10 UTC (rev 35937)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/KnowledgeBaseRemoteClient.java 2010-11-10 16:41:47 UTC (rev 35938)
@@ -77,7 +77,7 @@
null,
kresultsId ) } ) );
- sendMessage( this.cm,
+ ConversationUtil.sendMessage( this.cm,
(InetSocketAddress) this.gsd.getAddresses().get( "socket" ).getObject(),
this.gsd.getId(),
cmd );
@@ -152,7 +152,7 @@
null,
kresultsId ) ) } ) );
- sendMessage( this.cm,
+ ConversationUtil.sendMessage( this.cm,
(InetSocketAddress) this.gsd.getAddresses().get( "socket" ).getObject(),
this.gsd.getId(),
cmd );
@@ -192,45 +192,5 @@
throw new UnsupportedOperationException( "Not supported yet." );
}
- public static Object sendMessage(ConversationManager conversationManager,
- Serializable addr,
- String id,
- Object body) {
-
- InetSocketAddress[] sockets = null;
- if ( addr instanceof InetSocketAddress[] ) {
- sockets = (InetSocketAddress[]) addr;
- } else if ( addr instanceof InetSocketAddress ) {
- sockets = new InetSocketAddress[ 1 ];
- sockets[0] = (InetSocketAddress) addr;
- }
-
- BlockingMessageResponseHandler handler = new BlockingMessageResponseHandler();
- Exception exception = null;
- for ( InetSocketAddress socket : sockets ) {
- try {
- Conversation conv = conversationManager.startConversation( socket,
- id );
- conv.sendMessage( body,
- handler );
- exception = null;
- } catch ( Exception e ) {
- exception = e;
- conversationManager.endConversation();
- }
- if ( exception == null ) {
- break;
- }
- }
- if ( exception != null ) {
- throw new RuntimeException( "Unable to send message",
- exception );
- }
- try {
- return handler.getMessage().getBody();
- } finally {
- conversationManager.endConversation();
- }
- }
-
+
}
Modified: labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/KnowledgeBuilderProviderRemoteClient.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/KnowledgeBuilderProviderRemoteClient.java 2010-11-10 14:58:10 UTC (rev 35937)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/KnowledgeBuilderProviderRemoteClient.java 2010-11-10 16:41:47 UTC (rev 35938)
@@ -17,7 +17,6 @@
package org.drools.grid.remote;
import com.sun.tools.xjc.Options;
-import java.io.Serializable;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Properties;
@@ -32,8 +31,6 @@
import org.drools.command.builder.NewKnowledgeBuilderCommand;
import org.drools.grid.GridNode;
import org.drools.grid.GridServiceDescription;
-import org.drools.grid.internal.responsehandlers.BlockingMessageResponseHandler;
-import org.drools.grid.io.Conversation;
import org.drools.grid.io.ConversationManager;
import org.drools.grid.io.impl.CommandImpl;
@@ -76,7 +73,7 @@
localId,
new NewKnowledgeBuilderCommand( null ) ) } ) );
- sendMessage( this.cm,
+ ConversationUtil.sendMessage( this.cm,
(InetSocketAddress) this.gsd.getAddresses().get( "socket" ).getObject(),
this.gsd.getId(),
cmd );
@@ -105,44 +102,5 @@
throw new UnsupportedOperationException( "Not supported yet." );
}
- public static Object sendMessage(ConversationManager conversationManager,
- Serializable addr,
- String recipientId,
- Object body) {
-
- InetSocketAddress[] sockets = null;
- if ( addr instanceof InetSocketAddress[] ) {
- sockets = (InetSocketAddress[]) addr;
- } else if ( addr instanceof InetSocketAddress ) {
- sockets = new InetSocketAddress[ 1 ];
- sockets[0] = (InetSocketAddress) addr;
- }
-
- BlockingMessageResponseHandler handler = new BlockingMessageResponseHandler();
- Exception exception = null;
- for ( InetSocketAddress socket : sockets ) {
- try {
- Conversation conv = conversationManager.startConversation( socket,
- recipientId );
- conv.sendMessage( body,
- handler );
- exception = null;
- } catch ( Exception e ) {
- exception = e;
- conversationManager.endConversation();
- }
- if ( exception == null ) {
- break;
- }
- }
- if ( exception != null ) {
- throw new RuntimeException( "Unable to send message",
- exception );
- }
- try {
- return handler.getMessage().getBody();
- } finally {
- conversationManager.endConversation();
- }
- }
+
}
Modified: labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/KnowledgeBuilderRemoteClient.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/KnowledgeBuilderRemoteClient.java 2010-11-10 14:58:10 UTC (rev 35937)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/KnowledgeBuilderRemoteClient.java 2010-11-10 16:41:47 UTC (rev 35938)
@@ -17,12 +17,9 @@
package org.drools.grid.remote;
-import java.io.Serializable;
import java.net.InetSocketAddress;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.List;
import java.util.UUID;
import org.drools.KnowledgeBase;
import org.drools.builder.KnowledgeBuilder;
@@ -32,18 +29,13 @@
import org.drools.command.KnowledgeContextResolveFromContextCommand;
import org.drools.command.builder.KnowledgeBuilderAddCommand;
import org.drools.command.builder.KnowledgeBuilderGetErrorsCommand;
-import org.drools.command.impl.GenericCommand;
-import org.drools.command.runtime.BatchExecutionCommandImpl;
import org.drools.definition.KnowledgePackage;
import org.drools.grid.GridNode;
import org.drools.grid.GridServiceDescription;
-import org.drools.grid.internal.responsehandlers.BlockingMessageResponseHandler;
-import org.drools.grid.io.Conversation;
import org.drools.grid.io.ConversationManager;
import org.drools.grid.io.impl.CollectionClient;
import org.drools.grid.io.impl.CommandImpl;
import org.drools.io.Resource;
-import org.drools.runtime.ExecutionResults;
/**
*
@@ -88,7 +80,7 @@
null,
null ) } ) );
- sendMessage( this.cm,
+ ConversationUtil.sendMessage( this.cm,
(InetSocketAddress) this.gsd.getAddresses().get( "socket" ).getObject(),
this.gsd.getId(),
cmd );
@@ -119,7 +111,7 @@
null,
kresultsId ) } ) );
- Object result = sendMessage( this.cm,
+ Object result = ConversationUtil.sendMessage( this.cm,
(InetSocketAddress) this.gsd.getAddresses().get( "socket" ).getObject(),
this.gsd.getId(),
cmd );
@@ -127,46 +119,5 @@
return (KnowledgeBuilderErrors) result;
}
-
- public static Object sendMessage(ConversationManager conversationManager,
- Serializable addr,
- String id,
- Object body) {
-
- InetSocketAddress[] sockets = null;
- if ( addr instanceof InetSocketAddress[] ) {
- sockets = (InetSocketAddress[]) addr;
- } else if ( addr instanceof InetSocketAddress ) {
- sockets = new InetSocketAddress[ 1 ];
- sockets[0] = (InetSocketAddress) addr;
- }
-
- BlockingMessageResponseHandler handler = new BlockingMessageResponseHandler();
- Exception exception = null;
- for ( InetSocketAddress socket : sockets ) {
- try {
- Conversation conv = conversationManager.startConversation( socket,
- id );
- conv.sendMessage( body,
- handler );
- exception = null;
- } catch ( Exception e ) {
- exception = e;
- conversationManager.endConversation();
- }
- if ( exception == null ) {
- break;
- }
- }
- if ( exception != null ) {
- throw new RuntimeException( "Unable to send message",
- exception );
- }
- try {
- return handler.getMessage().getBody();
- } finally {
- conversationManager.endConversation();
- }
- }
-
+
}
Modified: labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/StatefulKnowledgeSessionRemoteClient.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/StatefulKnowledgeSessionRemoteClient.java 2010-11-10 14:58:10 UTC (rev 35937)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/StatefulKnowledgeSessionRemoteClient.java 2010-11-10 16:41:47 UTC (rev 35938)
@@ -17,7 +17,6 @@
package org.drools.grid.remote;
-import java.io.Serializable;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collection;
@@ -26,16 +25,19 @@
import org.drools.command.Command;
import org.drools.command.CommandFactory;
import org.drools.command.KnowledgeContextResolveFromContextCommand;
+import org.drools.command.runtime.process.SignalEventCommand;
+import org.drools.command.runtime.process.StartProcessCommand;
+import org.drools.command.runtime.rule.GetFactHandleCommand;
import org.drools.command.runtime.rule.InsertObjectCommand;
+import org.drools.command.runtime.rule.UpdateCommand;
import org.drools.event.process.ProcessEventListener;
import org.drools.event.rule.AgendaEventListener;
import org.drools.event.rule.WorkingMemoryEventListener;
import org.drools.grid.GridNode;
import org.drools.grid.GridServiceDescription;
-import org.drools.grid.internal.responsehandlers.BlockingMessageResponseHandler;
-import org.drools.grid.io.Conversation;
import org.drools.grid.io.ConversationManager;
import org.drools.grid.io.impl.CommandImpl;
+import org.drools.grid.remote.command.GetWorkingMemoryEntryPointRemoteCommand;
import org.drools.runtime.Calendars;
import org.drools.runtime.Channel;
import org.drools.runtime.Environment;
@@ -91,38 +93,14 @@
this.instanceId,
kresultsId ) } ) );
- Object result = sendMessage( this.cm,
+ Object result = ConversationUtil.sendMessage( this.cm,
(InetSocketAddress) this.gsd.getAddresses().get( "socket" ).getObject(),
this.gsd.getId(),
cmd );
return (Integer) result;
- // String commandId = "ksession.fireAllRules" + this.messageSession.getNextId();
- // String kresultsId = "kresults_" + this.messageSession.getSessionId();
- //
- // Message msg = new Message( this.messageSession.getSessionId(),
- // this.messageSession.counter.incrementAndGet(),
- // false,
- // new KnowledgeContextResolveFromContextCommand( CommandFactory.newFireAllRules( commandId ),
- // null,
- // null,
- // this.instanceId,
- // kresultsId ) );
- // try {
- // this.connector.connect();
- // Object object = this.connector.write( msg ).getPayload();
- //
- // if ( object == null ) {
- // throw new RuntimeException( "Response was not correctly received" );
- // }
- // this.connector.disconnect();
- // //return (Integer) ((ExecutionResults) object).getValue(commandId);
- // return (Integer) object;
- // } catch ( Exception e ) {
- // throw new RuntimeException( "Unable to execute message",
- // e );
- // }
+
}
public int fireAllRules(int max) {
@@ -142,50 +120,7 @@
}
public <T> T execute(Command<T> command) {
- // String localId = UUID.randomUUID().toString();
- // String commandId = "ksession.execute" + this.gsd.getId();
- // String kresultsId = "kresults_" + this.gsd.getId();
- // CommandImpl cmd = new CommandImpl("execute",
- // Arrays.asList(new Object[]{new KnowledgeContextResolveFromContextCommand( new ExecuteCommand( command ),
- // null,
- // null,
- // this.instanceId,
- // kresultsId )}));
- //
- // Object result = sendMessage(this.cm,
- // (InetSocketAddress[]) this.gsd.getAddresses().get("socket").getObject(),
- // this.gsd.getServiceInterface().getName(),
- // cmd);
- //
- //
- //
- // return (T) result;
-
- // String commandId = "ksession.execute" + this.messageSession.getNextId();
- // String kresultsId = "kresults_" + this.messageSession.getSessionId();
- //
- // Message msg = new Message( this.messageSession.getSessionId(),
- // this.messageSession.counter.incrementAndGet(),
- // false,
- // new KnowledgeContextResolveFromContextCommand( new ExecuteCommand( commandId,
- // command ),
- // null,
- // null,
- // this.instanceId,
- // kresultsId ) );
- //
- // try {
- // this.connector.connect();
- // Object object = this.connector.write( msg ).getPayload();
- // if ( object == null ) {
- // throw new RuntimeException( "Response was not correctly received" );
- // }
- // this.connector.disconnect();
- // return (ExecutionResults) ((ExecutionResults) object).getValue( commandId );
- // } catch ( Exception e ) {
- // throw new RuntimeException( "Unable to execute message",
- // e );
- // }
+
throw new UnsupportedOperationException( "Not supported yet." );
}
@@ -196,11 +131,38 @@
public void setGlobal(String identifier,
Object object) {
- throw new UnsupportedOperationException( "Not supported yet." );
+ String kresultsId = "kresults_" + this.gsd.getId();
+ CommandImpl cmd = new CommandImpl("execute",
+ Arrays.asList(new Object[]{ new KnowledgeContextResolveFromContextCommand( CommandFactory.newSetGlobal( identifier,
+ object ),
+ null,
+ null,
+ this.instanceId,
+ kresultsId )}));
+
+ ConversationUtil.sendMessage(this.cm,
+ (InetSocketAddress) this.gsd.getAddresses().get("socket").getObject(),
+ this.gsd.getId(),
+ cmd);
}
public Object getGlobal(String identifier) {
- throw new UnsupportedOperationException( "Not supported yet." );
+ String kresultsId = "kresults_" + this.gsd.getId();
+ CommandImpl cmd = new CommandImpl("execute",
+ Arrays.asList(new Object[]{ new KnowledgeContextResolveFromContextCommand( CommandFactory.newGetGlobal( identifier ),
+ null,
+ null,
+ this.instanceId,
+ kresultsId )}));
+
+ Object result = ConversationUtil.sendMessage(this.cm,
+ (InetSocketAddress) this.gsd.getAddresses().get("socket").getObject(),
+ this.gsd.getId(),
+ cmd);
+
+
+
+ return result;
}
public Globals getGlobals() {
@@ -254,7 +216,26 @@
}
public WorkingMemoryEntryPoint getWorkingMemoryEntryPoint(String name) {
- throw new UnsupportedOperationException( "Not supported yet." );
+ String kresultsId = "kresults_" + this.gsd.getId();
+ CommandImpl cmd = new CommandImpl("execute",
+ Arrays.asList(new Object[]{ new KnowledgeContextResolveFromContextCommand( new GetWorkingMemoryEntryPointRemoteCommand( name ),
+ null,
+ null,
+ this.instanceId,
+ name,
+ kresultsId )}));
+
+ ConversationUtil.sendMessage(this.cm,
+ (InetSocketAddress[]) this.gsd.getAddresses().get("socket").getObject(),
+ this.gsd.getId(),
+ cmd);
+
+
+
+ return new WorkingMemoryEntryPointRemoteClient( this.instanceId,
+ name,
+ this.gsd,
+ this.cm );
}
public Collection< ? extends WorkingMemoryEntryPoint> getWorkingMemoryEntryPoints() {
@@ -287,7 +268,7 @@
this.instanceId,
kresultsId ) } ) );
- Object result = sendMessage( this.cm,
+ Object result = ConversationUtil.sendMessage( this.cm,
(InetSocketAddress) this.gsd.getAddresses().get( "socket" ).getObject(),
this.gsd.getId(),
cmd );
@@ -296,16 +277,51 @@
}
public void retract(FactHandle handle) {
- throw new UnsupportedOperationException( "Not supported yet." );
+ String kresultsId = "kresults_" + this.gsd.getId();
+ CommandImpl cmd = new CommandImpl("execute",
+ Arrays.asList(new Object[]{ new KnowledgeContextResolveFromContextCommand( CommandFactory.newRetract( handle ),
+ null,
+ null,
+ this.instanceId,
+ kresultsId )}));
+
+ ConversationUtil.sendMessage(this.cm,
+ (InetSocketAddress) this.gsd.getAddresses().get("socket").getObject(),
+ this.gsd.getId(),
+ cmd);
}
public void update(FactHandle handle,
Object object) {
- throw new UnsupportedOperationException( "Not supported yet." );
+ String kresultsId = "kresults_" + this.gsd.getId();
+ CommandImpl cmd = new CommandImpl("execute",
+ Arrays.asList(new Object[]{ new KnowledgeContextResolveFromContextCommand( new UpdateCommand( handle, object ),
+ null,
+ null,
+ this.instanceId,
+ kresultsId )}));
+
+ ConversationUtil.sendMessage(this.cm,
+ (InetSocketAddress) this.gsd.getAddresses().get("socket").getObject(),
+ this.gsd.getId(),
+ cmd);
}
public FactHandle getFactHandle(Object object) {
- throw new UnsupportedOperationException( "Not supported yet." );
+ String kresultsId = "kresults_" + this.gsd.getId();
+ CommandImpl cmd = new CommandImpl("execute",
+ Arrays.asList(new Object[]{ new KnowledgeContextResolveFromContextCommand( new GetFactHandleCommand( object, true ),
+ null,
+ null,
+ this.instanceId,
+ kresultsId )}));
+
+ Object result = ConversationUtil.sendMessage(this.cm,
+ (InetSocketAddress) this.gsd.getAddresses().get("socket").getObject(),
+ this.gsd.getId(),
+ cmd);
+
+ return (FactHandle) result;
}
public Object getObject(FactHandle factHandle) {
@@ -333,23 +349,60 @@
}
public ProcessInstance startProcess(String processId) {
- throw new UnsupportedOperationException( "Not supported yet." );
+ return startProcess(processId, null);
}
public ProcessInstance startProcess(String processId,
Map<String, Object> parameters) {
- throw new UnsupportedOperationException( "Not supported yet." );
+ String kresultsId = "kresults_" + this.gsd.getId();
+ CommandImpl cmd = new CommandImpl("execute",
+ Arrays.asList(new Object[]{ new KnowledgeContextResolveFromContextCommand( new StartProcessCommand( processId, parameters ),
+ null,
+ null,
+ this.instanceId,
+ kresultsId )}));
+
+ Object result = ConversationUtil.sendMessage(this.cm,
+ (InetSocketAddress) this.gsd.getAddresses().get("socket").getObject(),
+ this.gsd.getId(),
+ cmd);
+
+
+
+ return (ProcessInstance) result;
}
public void signalEvent(String type,
Object event) {
- throw new UnsupportedOperationException( "Not supported yet." );
+ String kresultsId = "kresults_" + this.gsd.getId();
+ CommandImpl cmd = new CommandImpl("execute",
+ Arrays.asList(new Object[]{ new KnowledgeContextResolveFromContextCommand( new SignalEventCommand( type, event ),
+ null,
+ null,
+ this.instanceId,
+ kresultsId )}));
+
+ ConversationUtil.sendMessage(this.cm,
+ (InetSocketAddress) this.gsd.getAddresses().get("socket").getObject(),
+ this.gsd.getId(),
+ cmd);
}
public void signalEvent(String type,
Object event,
long processInstanceId) {
- throw new UnsupportedOperationException( "Not supported yet." );
+ String kresultsId = "kresults_" + this.gsd.getId();
+ CommandImpl cmd = new CommandImpl("execute",
+ Arrays.asList(new Object[]{ new KnowledgeContextResolveFromContextCommand( new SignalEventCommand( type, event ),
+ null,
+ null,
+ this.instanceId,
+ kresultsId )}));
+
+ ConversationUtil.sendMessage(this.cm,
+ (InetSocketAddress) this.gsd.getAddresses().get("socket").getObject(),
+ this.gsd.getId(),
+ cmd);
}
public Collection<ProcessInstance> getProcessInstances() {
@@ -404,44 +457,5 @@
throw new UnsupportedOperationException( "Not supported yet." );
}
- public static Object sendMessage(ConversationManager conversationManager,
- Serializable addr,
- String id,
- Object body) {
-
- InetSocketAddress[] sockets = null;
- if ( addr instanceof InetSocketAddress[] ) {
- sockets = (InetSocketAddress[]) addr;
- } else if ( addr instanceof InetSocketAddress ) {
- sockets = new InetSocketAddress[ 1 ];
- sockets[0] = (InetSocketAddress) addr;
- }
-
- BlockingMessageResponseHandler handler = new BlockingMessageResponseHandler();
- Exception exception = null;
- for ( InetSocketAddress socket : sockets ) {
- try {
- Conversation conv = conversationManager.startConversation( socket,
- id );
- conv.sendMessage( body,
- handler );
- exception = null;
- } catch ( Exception e ) {
- exception = e;
- conversationManager.endConversation();
- }
- if ( exception == null ) {
- break;
- }
- }
- if ( exception != null ) {
- throw new RuntimeException( "Unable to send message",
- exception );
- }
- try {
- return handler.getMessage().getBody();
- } finally {
- conversationManager.endConversation();
- }
- }
+
}
Copied: labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/WorkingMemoryEntryPointRemoteClient.java (from rev 35787, labs/jbossrules/trunk/drools-grid/drools-grid-remote-api/src/main/java/org/drools/grid/remote/WorkingMemoryEntryPointRemoteClient.java)
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/WorkingMemoryEntryPointRemoteClient.java (rev 0)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/WorkingMemoryEntryPointRemoteClient.java 2010-11-10 16:41:47 UTC (rev 35938)
@@ -0,0 +1,137 @@
+/*
+ * Copyright 2010 salaboy.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * under the License.
+ */
+
+package org.drools.grid.remote;
+
+import java.io.Serializable;
+import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.drools.FactException;
+import org.drools.FactHandle;
+import org.drools.WorkingMemoryEntryPoint;
+import org.drools.command.KnowledgeContextResolveFromContextCommand;
+import org.drools.command.runtime.rule.InsertObjectCommand;
+import org.drools.grid.GridServiceDescription;
+import org.drools.grid.internal.responsehandlers.BlockingMessageResponseHandler;
+import org.drools.grid.io.Conversation;
+import org.drools.grid.io.ConversationManager;
+import org.drools.grid.io.impl.CommandImpl;
+import org.drools.grid.service.directory.Address;
+import org.drools.runtime.ObjectFilter;
+
+/**
+ *
+ * @author salaboy
+ */
+public class WorkingMemoryEntryPointRemoteClient
+ implements
+ WorkingMemoryEntryPoint {
+
+ private String instanceId;
+ private String name;
+ private GridServiceDescription gsd;
+ private ConversationManager cm;
+
+
+ public WorkingMemoryEntryPointRemoteClient(String instanceId, String name,
+ GridServiceDescription gsd,
+ ConversationManager cm) {
+ this.instanceId = instanceId;
+ this.name = name;
+ this.gsd = gsd;
+ this.cm = cm;
+
+ }
+
+ public FactHandle insert(Object object) throws FactException {
+
+
+ String kresultsId = "kresults_" + this.gsd.getId();
+
+ InsertObjectCommand insertCmd = new InsertObjectCommand( object, true );
+ insertCmd.setEntryPoint(name);
+ CommandImpl cmd = new CommandImpl("execute",
+ Arrays.asList(new Object[]{ new KnowledgeContextResolveFromContextCommand( insertCmd,
+ null,
+ null,
+ this.instanceId,
+ this.name,
+ kresultsId )}));
+
+ Object result = ConversationUtil.sendMessage(this.cm,
+ ((InetSocketAddress[])((Address) this.gsd.getAddresses().get("socket")).getObject()),
+ this.gsd.getServiceInterface().getName(),
+ cmd);
+ return ((FactHandle)result);
+
+
+ }
+
+ public FactHandle insert(Object object,
+ boolean dynamic) throws FactException {
+ throw new UnsupportedOperationException( "Not supported yet." );
+ }
+
+ public void retract(org.drools.runtime.rule.FactHandle handle) throws FactException {
+ throw new UnsupportedOperationException( "Not supported yet." );
+ }
+
+ public void update(org.drools.runtime.rule.FactHandle handle,
+ Object object) throws FactException {
+ throw new UnsupportedOperationException( "Not supported yet." );
+ }
+
+ public WorkingMemoryEntryPoint getWorkingMemoryEntryPoint(String name) {
+ throw new UnsupportedOperationException( "Not supported yet." );
+ }
+
+ public String getEntryPointId() {
+ return this.name;
+ }
+
+ public org.drools.runtime.rule.FactHandle getFactHandle(Object object) {
+ throw new UnsupportedOperationException( "Not supported yet." );
+ }
+
+ public Object getObject(org.drools.runtime.rule.FactHandle factHandle) {
+ throw new UnsupportedOperationException( "Not supported yet." );
+ }
+
+ public Collection<Object> getObjects() {
+ throw new UnsupportedOperationException( "Not supported yet." );
+ }
+
+ public Collection<Object> getObjects(ObjectFilter filter) {
+ throw new UnsupportedOperationException( "Not supported yet." );
+ }
+
+ public <T extends org.drools.runtime.rule.FactHandle> Collection<T> getFactHandles() {
+ throw new UnsupportedOperationException( "Not supported yet." );
+ }
+
+ public <T extends org.drools.runtime.rule.FactHandle> Collection<T> getFactHandles(ObjectFilter filter) {
+ throw new UnsupportedOperationException( "Not supported yet." );
+ }
+
+ public long getFactCount() {
+ throw new UnsupportedOperationException( "Not supported yet." );
+ }
+
+
+}
Copied: labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/command/GetWorkingMemoryEntryPointRemoteCommand.java (from rev 35787, labs/jbossrules/trunk/drools-grid/drools-grid-remote-api/src/main/java/org/drools/grid/remote/internal/commands/GetWorkingMemoryEntryPointRemoteCommand.java)
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/command/GetWorkingMemoryEntryPointRemoteCommand.java (rev 0)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/main/java/org/drools/grid/remote/command/GetWorkingMemoryEntryPointRemoteCommand.java 2010-11-10 16:41:47 UTC (rev 35938)
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2010 salaboy.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * under the License.
+ */
+
+package org.drools.grid.remote.command;
+
+/**
+ *
+ * @author salaboy
+ * this class should not exist!
+ */
+
+import org.drools.command.Context;
+import org.drools.command.impl.GenericCommand;
+import org.drools.command.impl.KnowledgeCommandContext;
+import org.drools.runtime.StatefulKnowledgeSession;
+import org.drools.runtime.rule.WorkingMemoryEntryPoint;
+
+public class GetWorkingMemoryEntryPointRemoteCommand
+ implements
+ GenericCommand<WorkingMemoryEntryPoint> {
+
+ private String name;
+
+ public GetWorkingMemoryEntryPointRemoteCommand(String name) {
+ this.name = name;
+ }
+
+ public WorkingMemoryEntryPoint execute(Context context) {
+ StatefulKnowledgeSession ksession = ((KnowledgeCommandContext) context).getStatefulKnowledgesession();
+ WorkingMemoryEntryPoint ep = ksession.getWorkingMemoryEntryPoint( this.name );
+ context.getContextManager().getDefaultContext().set( this.name,
+ ep );
+ // If I return the command I need to create a serializable version of NamedEntryPoint
+ return null;
+ }
+
+ @Override
+ public String toString() {
+ return "session.getWorkingMemoryEntryPoint( " + this.name + " );";
+ }
+}
Modified: labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/test/java/org/drools/grid/NodeTests.java
===================================================================
--- labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/test/java/org/drools/grid/NodeTests.java 2010-11-10 14:58:10 UTC (rev 35937)
+++ labs/jbossrules/trunk/drools-grid/drools-grid-impl/src/test/java/org/drools/grid/NodeTests.java 2010-11-10 16:41:47 UTC (rev 35938)
@@ -18,7 +18,6 @@
package org.drools.grid;
import java.io.Serializable;
-import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import org.drools.KnowledgeBase;
@@ -29,32 +28,19 @@
import org.drools.builder.KnowledgeBuilderErrors;
import org.drools.builder.KnowledgeBuilderFactoryService;
import org.drools.builder.ResourceType;
-import org.drools.command.assertion.AssertEquals;
-import org.drools.command.runtime.rule.InsertObjectCommand;
import org.drools.grid.conf.GridPeerServiceConfiguration;
-import org.drools.grid.conf.impl.GridNodeLocalConfiguration;
-import org.drools.grid.conf.impl.GridNodeSocketConfiguration;
import org.drools.grid.conf.impl.GridPeerConfiguration;
import org.drools.grid.impl.GridImpl;
import org.drools.grid.impl.MultiplexSocketServerImpl;
import org.drools.grid.io.impl.MultiplexSocketServiceCongifuration;
-import org.drools.grid.remote.GridNodeRemoteClient;
import org.drools.grid.remote.mina.MinaAcceptorFactoryService;
-import org.drools.grid.service.directory.Address;
import org.drools.grid.service.directory.WhitePages;
import org.drools.grid.service.directory.impl.CoreServicesLookupConfiguration;
-import org.drools.grid.service.directory.impl.GridServiceDescriptionImpl;
import org.drools.grid.service.directory.impl.WhitePagesLocalConfiguration;
-import org.drools.grid.service.directory.impl.WhitePagesSocketConfiguration;
import org.drools.grid.timer.impl.CoreServicesSchedulerConfiguration;
-import org.drools.grid.timer.impl.RegisterSchedulerConfiguration;
-import org.drools.grid.timer.impl.SchedulerLocalConfiguration;
-import org.drools.grid.timer.impl.SchedulerSocketConfiguration;
import org.drools.io.impl.ByteArrayResource;
import org.drools.runtime.StatefulKnowledgeSession;
import org.drools.runtime.rule.FactHandle;
-import org.drools.time.Scheduler;
-import org.drools.time.SchedulerService;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -152,7 +138,7 @@
Assert.assertNotNull( kbuilder );
String rule = "package test\n"
- + "Rule \"test\""
+ + "rule \"test\""
+ " when"
+ " then"
+ " System.out.println(\"Rule Fired!\");"
@@ -180,15 +166,96 @@
Assert.assertNotNull( session );
- FactHandle handle = session.insert( new MyObject() );
+ FactHandle handle = session.insert( new MyObject("myObj1") );
Assert.assertNotNull( handle );
int i = session.fireAllRules();
Assert.assertEquals( 1,
i );
+
+ remoteN1.dispose();
+ grid1.get(SocketService.class).close();
}
+
+ @Test
+ public void remoteNodeRetractUpdateGlobalsTest() {
+ Grid grid1 = new GridImpl( new HashMap<String, Object>() );
+ configureGrid1( grid1,
+ 8000,
+ null );
+ Grid grid2 = new GridImpl( new HashMap<String, Object>() );
+ configureGrid1( grid2,
+ -1,
+ grid1.get( WhitePages.class ) );
+
+ GridNode n1 = grid1.createGridNode( "n1" );
+ grid1.get( SocketService.class ).addService( "n1", 8000, n1 );
+
+ GridServiceDescription<GridNode> n1Gsd = grid2.get( WhitePages.class ).lookup( "n1" );
+ GridConnection<GridNode> conn = grid2.get( ConnectionFactoryService.class ).createConnection( n1Gsd );
+ GridNode remoteN1 = conn.connect();
+
+ KnowledgeBuilder kbuilder = remoteN1.get( KnowledgeBuilderFactoryService.class ).newKnowledgeBuilder();
+
+ Assert.assertNotNull( kbuilder );
+
+ String rule = "package test\n"
+ + "import org.drools.grid.NodeTests.MyObject;\n"
+ + "global MyObject myGlobalObj;\n"
+ + "rule \"test\""
+ + " when"
+ + " $o: MyObject()"
+ + " then"
+ + " System.out.println(\"My Global Object -> \"+myGlobalObj.getName());"
+ + " System.out.println(\"Rule Fired! ->\"+$o.getName());"
+ + " end";
+
+ kbuilder.add( new ByteArrayResource( rule.getBytes() ),
+ ResourceType.DRL );
+
+ KnowledgeBuilderErrors errors = kbuilder.getErrors();
+ if ( errors != null && errors.size() > 0 ) {
+ for ( KnowledgeBuilderError error : errors ) {
+ System.out.println( "Error: " + error.getMessage() );
+
+ }
+ fail("KnowledgeBase did not build");
+ }
+
+ KnowledgeBase kbase = remoteN1.get( KnowledgeBaseFactoryService.class ).newKnowledgeBase();
+
+ Assert.assertNotNull( kbase );
+
+ kbase.addKnowledgePackages( kbuilder.getKnowledgePackages() );
+
+ StatefulKnowledgeSession session = kbase.newStatefulKnowledgeSession();
+
+ Assert.assertNotNull( session );
+ session.setGlobal("myGlobalObj", new MyObject("myGlobalObj"));
+
+ FactHandle handle = session.insert( new MyObject("myObj1") );
+ Assert.assertNotNull( handle );
+
+ int fired = session.fireAllRules();
+ Assert.assertEquals( 1,
+ fired );
+
+ session.retract(handle);
+
+
+ handle = session.insert(new MyObject("myObj2"));
+
+ session.update(handle, new MyObject("myObj3"));
+
+ fired = session.fireAllRules();
+
+ remoteN1.dispose();
+ grid1.get(SocketService.class).close();
+
+ }
+
private void configureGrid1(Grid grid,
int port,
WhitePages wp) {
@@ -228,11 +295,21 @@
}
- private static class MyObject
+ public static class MyObject
implements
Serializable {
+ private String name;
+ public MyObject(String name) {
+ this.name = name;
+ }
- public MyObject() {
+ public String getName() {
+ return name;
}
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
}
}
\ No newline at end of file
More information about the jboss-svn-commits
mailing list