Ticket #29: NodeImpl.java

File NodeImpl.java, 21.8 kB (added by jordi, 1 year ago)
Line 
1package planet.generic.commonapi;
2import java.util.Collection;
3import java.util.Hashtable;
4import java.util.Iterator;
5import java.util.Vector;
6
7import planet.commonapi.Application;
8import planet.commonapi.EndPoint;
9import planet.commonapi.Id;
10import planet.commonapi.Message;
11import planet.commonapi.Node;
12import planet.commonapi.NodeHandle;
13import planet.commonapi.RouteMessage;
14import planet.commonapi.exception.InitializationException;
15import planet.commonapi.results.ResultsConstraint;
16import planet.commonapi.results.ResultsEdge;
17import planet.generic.commonapi.factory.GenericFactory;
18import planet.simulate.Logger;
19import planet.simulate.MessageListener;
20import planet.simulate.Results;
21import planet.util.Properties;
22import planet.util.Queue;
23import planet.util.QueueFull;
24import planet.util.timer.TimerTask;
25
26/**
27 * Superclass which represents a node in a peer-to-peer system, regardless of
28 * the underlying protocol. All nodes, implement the methods of this class.
29 *
30 * @author Pedro García
31 * @author Carles Pairot
32 * @author Ruben Mondejar
33 */
34public abstract class NodeImpl
35                implements
36                        planet.commonapi.Node,
37                        java.io.Serializable {
38       
39        protected Id id;
40        protected transient Hashtable listeners;
41        private transient Queue incoming;
42        private transient Queue outgoing;
43        private int processed = 0;
44        /**
45         * NodeHandle for the actual Node.
46         */
47        protected NodeHandle nodeHandle = null;
48        /**
49         * To contain int[] with [firstTime][period] values for each task.
50         * If period is zero (0), corresponds with a task to execute only once.
51         */
52        private Vector timer = null;
53        /**
54         * To contain the jobs (TimerTask) to execute at each time.
55         */
56        private Vector tasks = null;
57        private long timerCount = 0; //counter
58        private long timerNext = Long.MAX_VALUE; //next time to activate
59       
60        /**
61         * Local EndPoints.
62         */
63        protected Hashtable endpoints;
64        protected boolean role = true;
65       
66        /**
67         * Initializes internal data structures.
68         */
69        public NodeImpl()  {
70        timer = new Vector(2);
71        tasks = new Vector(2);
72        endpoints = new Hashtable();
73        init();
74    }
75       
76        /**
77         * Inicialite
78         */
79        private void init() {
80                listeners = new Hashtable();
81                incoming = new Queue(Properties.simulatorQueueSize);
82                outgoing = new Queue(Properties.simulatorQueueSize);
83        }
84       
85        /**
86         * The node joins in the network
87         *
88         * @param bootstrap
89         *            Id of a node in the network
90         */
91        public abstract void join(NodeHandle bootstrap);
92       
93        /**
94         * The node leaves the network
95         */
96        public abstract void leave();
97       
98        /**
99         * Given a time fraction, the node it can do everything what needs during
100         * this
101         *
102         * @param actualStep
103         *            Actual step in simulation time.
104         * @return Always true.
105         */
106        public boolean process(int actualStep) {
107                processed = 0;
108                processTasks();
109                return true;
110        }
111       
112        /**
113         * Invokes to each registered Application, by the related EndPoint,
114         * the <b>byStep()</b> method. This inform to each application for
115         * a new step. This method must to be invoked at the end of each
116         * <b>process(int)</b> node implementation.
117         */
118        protected void invokeByStepToAllApplications() {
119            Iterator it = endpoints.values().iterator();
120            while (it.hasNext())
121                ((EndPoint)it.next()).byStep();
122        }
123       
124        /**
125         * Puts a message in the incoming queue of this node
126         *
127         * @param msg
128         *            received Message
129         */
130        public void receive(RouteMessage msg) throws QueueFull {
131                Logger.logReceive(id, msg, Logger.MSG_LOG);
132                incoming.add(msg);
133        }
134   
135    /**
136     * A wrapper method, that sends a RouteMessage with the specified data.
137     * If any exception has ocurred during the send, a log with the description
138     * is made.
139     * @param key Key of the communication.
140     * @param from Communication source
141     * @param to Communication destination
142     * @param nextHop Next hop in the communication.
143     * @param type RouteMessage type
144     * @param mode RouteMessage mode
145     * @param appId Name of the related application.
146     * @param msg Data to be sent with the RouteMessage
147     * @return A valid RouteMessage with the specified data or null, if
148     * any error has ocurred.
149     */
150    public RouteMessage buildMessage(String key, NodeHandle from, NodeHandle to, NodeHandle nextHop, int type, int mode, String appId, Message msg)
151    {
152        RouteMessage bMsg = null;
153        try {
154            bMsg = GenericFactory.getMessage(key,from,to,nextHop,msg,type,mode,appId);
155        } catch (InitializationException e) {
156            Logger.log("ERROR: Cannot get a RouteMessage of MessagePool\n"
157                    + e.getMessage(), Logger.ERROR_LOG);
158        }
159        return bMsg;
160    }
161   
162    /**
163     * Builds a new RouteMessage with all the values appeared in <b>toCopy</b>, and
164     * the specified <b>nextHop</b>.
165     * @param toCopy Message to be cloned.
166     * @return A valid RouteMessage or null if there are any error.
167     */
168    public RouteMessage buildMessage(RouteMessage toCopy)
169    {
170        RouteMessage msg = null;
171        try {
172            msg = GenericFactory.getMessage(toCopy.getKey(),toCopy.getSource(),toCopy.getDestination(),toCopy.getNextHopHandle(),toCopy.getMessage(),toCopy.getType(),toCopy.getMode(),toCopy.getApplicationId());
173            //don't update statistics
174        } catch (InitializationException e)
175        {
176            Logger.log("ERROR: Cannot get a RouteMessage of MessagePool\n" + e.getMessage(), Logger.ERROR_LOG);
177            GenericFactory.freeMessage(msg);
178            msg = null;
179        }
180        return msg;
181    }
182       
183        /**
184         * Puts a message in the outcoming queue of this node
185         *
186         * @param msg
187         *            sended Message
188         */
189        public void send(RouteMessage msg) throws QueueFull {
190                outgoing.add(msg);
191                Logger.logSend(id, msg, Logger.MSG_LOG);
192        }
193       
194        /**
195         * Puts the RouteMessage <b>msg</b> to the outgoing queue of this node.
196         * @param msg RouteMessage to be sent.
197         */
198        public boolean sendMessage(RouteMessage msg)
199        {
200                try {
201                        send(msg);
202                } catch (QueueFull e){
203                        Logger.log("Outgoing Queue of Node " + this.id + " is Full",
204                                        Logger.ERROR_LOG);
205                        GenericFactory.freeMessage(msg);
206            return false;
207                }
208        return true;
209        }
210   
211    /**
212     * A wrapper method, that sends a RouteMessage with the specified data.
213     * If any exception has ocurred during the send, a log with the description
214     * is made.
215     * @param key Key of the communication.
216     * @param from Communication source
217     * @param to Communication destination
218     * @param type RouteMessage type
219     * @param mode RouteMessage mode
220     * @param msg Data to be sent with the RouteMessage
221     */
222    public boolean sendMessage(String key, NodeHandle from, NodeHandle to, int type, int mode, Message msg)
223    {
224        RouteMessage bMsg = null;
225        try {
226            bMsg = GenericFactory.getMessage(key, from, to, type, mode);
227            bMsg.setMessage(msg);
228            return sendMessage(bMsg);
229        } catch (InitializationException e) {
230            Logger.log("ERROR: Cannot get a RouteMessage of MessagePool\n"
231                    + e.getMessage(), Logger.ERROR_LOG);
232            return false;
233        }
234    }
235   
236    /**
237     * A wrapper method, that sends a RouteMessage with the specified data.
238     * If any exception has ocurred during the send, a log with the description
239     * is made.
240     * @param key Key of the communication.
241     * @param from Communication source
242     * @param to Communication destination
243     * @param nextHop Next hop in the communication.
244     * @param type RouteMessage type
245     * @param mode RouteMessage mode
246     * @param msg Data to be sent with the RouteMessage
247     */
248    public boolean sendMessage(String key, NodeHandle from, NodeHandle to, NodeHandle nextHop, int type, int mode, Message msg)
249    {
250        RouteMessage bMsg = null;
251        try {
252            bMsg = GenericFactory.getMessage(key,from,to,nextHop,msg,type,mode,null);
253            return sendMessage(bMsg);
254        } catch (InitializationException e) {
255            Logger.log("ERROR: Cannot get a RouteMessage of MessagePool\n"
256                    + e.getMessage(), Logger.ERROR_LOG);
257            return false;
258        }
259    }
260   
261    /**
262     * A wrapper method, that send a RouteMessage with the specified data.
263     * If any error has ocurred during the send, a log with the description
264     * is made.
265     * @param rMsg RouteMessage to be used.
266     * @param key Communication key
267     * @param from Communication source.
268     * @param to Communication destination.
269     * @param type RouteMessage type
270     * @param mode RouteMessage mode
271     * @param msg Data to be sent with the RouteMessage
272     */
273    public void sendMessage(RouteMessage rMsg,String key, NodeHandle from, NodeHandle to, NodeHandle nextHop,int type, int mode, Message msg)
274    {
275        rMsg.setValues(key,from,to,nextHop,type,mode,msg,rMsg.getApplicationId());
276        sendMessage(rMsg);
277    }
278       
279        /**
280         * Returns the present outgoing queue of this node
281         *
282         * @return outgoing Queue of Messages
283         */
284        public Queue outMessages() {
285                return outgoing;
286        }
287   
288    public Queue inMessages()
289    {
290        return incoming;
291    }
292       
293        /**
294         * Checks if the incoming queue have a messages to send
295         *
296         * @return return true if has incoming messages to process
297         */
298        protected boolean hasMoreMessages() {
299                return processed < Properties.simulatorProcessedMessages  && incoming.size() > 0;
300        }
301       
302        /**
303         * Return the next message and dequeue this of the incoming queue
304         *
305         * @return return the next Message
306         */
307        protected RouteMessage nextMessage() {
308                processed++;
309                return (RouteMessage) incoming.remove();
310        }
311
312        /**
313         * Returns the id of the node *
314         *
315         * @return Id node identificator
316         */
317        public Id getId() {
318                return id;
319        }
320       
321        /**
322         * Adds a listener to the node so that it executes herself when the message
323         * response arrives
324         *
325         * @param key
326         *            String representation of id routing message
327         * @param listener
328         *            MessageListener linked to Message
329         */
330        public void addMessageListener(String key, MessageListener listener) {
331                listeners.put(key, listener);
332        }
333       
334        /**
335         * Remove the message listener of the node
336         *
337         * @param key
338         *            String representation of id routing message
339         */
340        public void removeMessageListener(String key) {
341                listeners.remove(key);
342        }
343       
344        /**
345         * Returns information of the node
346         *
347         * @return info Hashtable with the information
348         */
349        public abstract Hashtable getInfo();
350        private void writeObject(java.io.ObjectOutputStream out)
351                        throws java.io.IOException {
352                out.defaultWriteObject();
353        }
354       
355        private void readObject(java.io.ObjectInputStream in)
356                        throws java.io.IOException, ClassNotFoundException {
357                in.defaultReadObject();
358                init();
359        }
360       
361        /**
362         * Returns the local NodeHandle
363         * @see planet.commonapi.Node#getLocalHandle()
364         * @return The actual local NodeHandle
365         */
366        public NodeHandle getLocalHandle() {
367                return nodeHandle;
368        }
369       
370        /**
371         * Sets a task to be executed periodicly at each <b>period</b> of time.
372         * @see planet.commonapi.Node#setTimer(planet.util.timer.TimerTask, long, long)
373         * @param task Job to do at each activation of the task.
374         * @param firstTime First activation of the task, measured in steps or millis.
375         * Its value is the relative time, not in absolute time.
376         * @param period Number of steps or millis to periodicly execute the task.
377         */
378        public void setTimer(TimerTask task, long firstTime, long period) {
379                timer.add(new long[]{firstTime,period});
380                tasks.add(task);
381                timerNext = (firstTime<timerNext)?firstTime:timerNext;
382        }
383       
384        /**
385         * Sets a task to be executed only one time at specified <b>firstTime</b>.
386         * @see planet.commonapi.Node#setTimer(planet.util.timer.TimerTask, long)
387         * @param task Job to do at activation.
388         * @param firstTime Moment to be activated, in steps or millis, measured
389         * in relative time, not in absolute time.
390         */
391        public void setTimer(TimerTask task, long firstTime) {
392                setTimer(task,firstTime,0);
393        }
394       
395        /**
396         * Evaluates if requires any timertask.
397         *
398         */
399        private void processTasks() {
400                timerCount++;
401                if (timerNext>timerCount) return;
402                long[] temp = null;
403                boolean evaluate;
404                int i=0;
405                while(i< timer.size()) {
406                        evaluate = true;
407                        temp = (long[])timer.get(i);
408                        temp[0]=temp[0]-timerCount;
409                        if (temp[0]<=0) {
410                                ((TimerTask)tasks.get(i)).run();
411                                if (temp[1]==0) { //if period==0 ==> only once
412                                        timer.remove(i);
413                                        tasks.remove(i);
414                                        i--; //update index
415                                        evaluate = false;
416                                } else {
417                                        temp[0]=temp[1];
418                                }
419                        }
420                        if (evaluate)
421                                timerNext = (timerNext>temp[0])?temp[0]:timerNext;
422                        i++;
423                }
424                timerCount=0;
425        }
426       
427        /**
428         * This returns a VirtualizedNode specific to the given application and
429         * instance name to the application, which the application can then use in
430         * order to send an receive messages.
431         *
432         * @param app
433         *            The Application
434         * @param instance
435         *            An identifier for a given instance
436         * @return The endpoint specific to this application, which can be used
437         *         for message sending/receiving. Return null if cannot build the
438         *         required EndPoint.
439         */
440        public EndPoint registerApplication(Application app, String instance) {
441                EndPoint endpoint = null;
442                try {
443                        endpoint = GenericFactory.buildEndPoint(app, this);
444                        endpoints.put(app.getId(), endpoint);
445                        app.setEndPoint(endpoint);
446                } catch (InitializationException e) {
447                        Logger.log("Cannot build a new EndPoint for this Application ["
448                                        + app + "] and Node [" + this.getId() + "].",
449                                        Logger.ERROR_LOG);
450                        e.printStackTrace();
451                }
452                return endpoint;
453        }
454       
455        /**
456         * Get the registered application by the <b>instanceName</b> name.
457         * @param instanceName Name of the registered application.
458         * @return null if there isn't an instance of <b>instanceName</b>, or
459         * the related endpoint for the application.
460         */
461        public EndPoint getRegisteredApplication(String instanceName)
462        {
463            return (EndPoint)endpoints.get(instanceName);
464        }
465        /**
466         * Returns all references of the applications that mantains this node.
467         *
468         * @return An array with all references to the applications that they are
469         *         mantained for this node.
470         */
471        public Application[] getRegisteredApplications() {
472                Collection values = this.endpoints.values();
473                Application[] apps = new Application[values.size()];
474                Iterator it = values.iterator();
475                for (int i = 0; it.hasNext(); i++) {
476                        apps[i] = ((EndPoint) it.next()).getApplication();
477                }
478                return apps;
479        }
480
481    /**
482         * The playsGoodRole's method is an extension method for commonapi specs. This method is
483         * used to allow BehavioursPool decide  wether the <b>Node</b> should run behaviours for
484         * good  peers or instead run  behaviours for bad peers. Moreover, this method  lets the
485         * programmer the responsability of decide node's role. Even it allows a node to  have a
486         * transient behaviour, sometimes behaving as a good peer and sometines behaving as a bad
487         * peer.
488         * @return True if the Node's Role is positive.
489         */
490        public boolean playsGoodRole() {
491                return role;
492        }
493        /**
494         * The setGoodRole's method is an extension method for commonapi specs. This method is
495         * used to set the role of the node inside the overlay network. A true value means the
496         * node will behave according to the underlying overlay protocol. A false  value means
497         * the node will misbehave.
498         * @param role
499         */
500        public void setGoodRole(boolean role) {
501                this.role = role;
502        }
503        /**
504         * The isLocalMessage's method is an extension method for commonapi specs. This method
505         * is used to allow BehavioursPool decide wether the incoming RouteMessage  is for the
506         * local node or for a  remote node. Remenber, this decision may only be addressed  by
507         * the underlying overlay protocol. For example, for Symphony's Lookup protocol a node
508         * is  responsible for all  RouteMessages whose keys have as an immediate succesor the
509         * node's id or have as destination the own node. 
510         * @see planet.commonapi.behaviours.BehavioursPool
511         * @return True if the incoming RouteMessage taken as input its for the local node.
512         */
513        public boolean isLocalMessage(RouteMessage msg) {
514                return true;
515        }
516       
517        /* BEGIN **************************** GML  ****************************/
518       
519    /**
520     * Adds new ResultsEdges to <b>edgeCollection</b> according to specified
521     * <b>constraint</b>.
522     * @param resultName Result type name to use.
523     * @param it Iterator over a collection of nodeHandles.
524     * @param constraint Specific constraints to test all built edges.
525     * @param edgeCollection Edges collection where they have to be added.
526     * @param color Fill color of the edge.
527     */
528    public void addEdges(String resultName, Iterator it, ResultsConstraint constraint, 
529            Collection edgeCollection, String color) {
530        ResultsEdge edge = null;
531        NodeHandle nh =