001    package org.LiveGraph.dataCache;
002    
003    import java.io.File;
004    import java.io.FileInputStream;
005    import java.io.FileNotFoundException;
006    import java.io.IOException;
007    import java.io.InputStream;
008    
009    import org.LiveGraph.LiveGraph;
010    import org.LiveGraph.dataCache.DataCache.CacheMode;
011    import org.LiveGraph.dataFile.read.DataStreamReader;
012    import org.LiveGraph.dataFile.read.PipedInputStream;
013    import org.LiveGraph.dataFile.write.DataStreamWriter;
014    import org.LiveGraph.dataFile.write.DataStreamWriterFactory;
015    import org.LiveGraph.events.Event;
016    import org.LiveGraph.events.EventListener;
017    import org.LiveGraph.events.EventManager;
018    import org.LiveGraph.events.EventProcessingException;
019    import org.LiveGraph.events.EventProducer;
020    import org.LiveGraph.events.EventType;
021    import org.LiveGraph.settings.DataFileSettings;
022    import org.LiveGraph.settings.SettingsEvent;
023    
024    import com.softnetConsult.utils.exceptions.ThrowableTools;
025    
026    
027    /**
028     * An object of this class is used to triger updates from a data input stream
029     * into a {@link DataCache} at regular intervals.
030     * 
031     * <p style="font-size:smaller;">This product includes software developed by the
032     *    <strong>LiveGraph</strong> project and its contributors.<br />
033     *    (<a href="http://www.live-graph.org" target="_blank">http://www.live-graph.org</a>)<br />
034     *    Copyright (c) 2007-2008 G. Paperin.<br />
035     *    All rights reserved.
036     * </p>
037     * <p style="font-size:smaller;">File: UpdateInvoker.java</p> 
038     * <p style="font-size:smaller;">Redistribution and use in source and binary forms, with or
039     *    without modification, are permitted provided that the following terms and conditions are met:
040     * </p>
041     * <p style="font-size:smaller;">1. Redistributions of source code must retain the above
042     *    acknowledgement of the LiveGraph project and its web-site, the above copyright notice,
043     *    this list of conditions and the following disclaimer.<br />
044     *    2. Redistributions in binary form must reproduce the above acknowledgement of the
045     *    LiveGraph project and its web-site, the above copyright notice, this list of conditions
046     *    and the following disclaimer in the documentation and/or other materials provided with
047     *    the distribution.<br />
048     *    3. All advertising materials mentioning features or use of this software or any derived
049     *    software must display the following acknowledgement:<br />
050     *    <em>This product includes software developed by the LiveGraph project and its
051     *    contributors.<br />(http://www.live-graph.org)</em><br />
052     *    4. All advertising materials distributed in form of HTML pages or any other technology
053     *    permitting active hyper-links that mention features or use of this software or any
054     *    derived software must display the acknowledgment specified in condition 3 of this
055     *    agreement, and in addition, include a visible and working hyper-link to the LiveGraph
056     *    homepage (http://www.live-graph.org).
057     * </p>
058     * <p style="font-size:smaller;">THIS SOFTWARE IS PROVIDED &quot;AS IS&quot;, WITHOUT WARRANTY
059     *    OF ANY KIND, EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
060     *    MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND  NONINFRINGEMENT. IN NO EVENT SHALL
061     *    THE AUTHORS, CONTRIBUTORS OR COPYRIGHT  HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
062     *    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING  FROM, OUT OF OR
063     *    IN CONNECTION WITH THE SOFTWARE OR THE USE OR  OTHER DEALINGS IN THE SOFTWARE.
064     * </p>
065     * 
066     * @author Greg Paperin (<a href="http://www.paperin.org" target="_blank">http://www.paperin.org</a>)
067     * @version {@value org.LiveGraph.LiveGraph#version}
068     */
069    public class UpdateInvoker implements Runnable, EventListener, EventProducer {
070    
071    /**
072     * How long to sleep for when updates are to be invoked automatically.
073     */
074    private static final long timeTickLength = 50; // milliseconds 
075    
076    /**
077     * Determines wether the invoker is in memory-stream mode.
078     * @see #startMemoryStreamMode(InputStream)
079     */
080    private boolean memoryStreamMode = false;
081    
082    /**
083     * The data reader for the input stream.
084     */
085    private DataStreamReader dataReader = null;
086    
087    /**
088     * Cache to hold the data.
089     */
090    private DataCache dataCache = null;
091    
092    /**
093     * Data file from which to update.
094     */
095    private File dataFile = null;
096    
097    /**
098     * Interval between updates in milliseconds.
099     */
100    private long interval = -1;
101    
102    /**
103     * Whether the invoker thread should wind up at the next possibility.
104     */
105    private boolean mustQuit = false;
106    
107    /**
108     * Remaining milliseconds till the next update.
109     */
110    private long remainingMillis = -1;
111    
112    /**
113     * System milliseconds at last update.
114     */
115    private long lastUpdateTime = 0;
116    
117    /**
118     * Milliseconds since last update.
119     */
120    private long sinceUpdateTime = 0;
121    
122    
123    /**
124     * Constructs a new invoker.
125     * @param cache The application's data cache.
126     */
127    public UpdateInvoker(DataCache cache) {
128            
129            if (null == cache)
130                    throw new NullPointerException("Cannot read data into a null cache");
131            
132            this.dataCache = cache;
133            
134            this.dataFile = null;
135            this.dataReader = null;
136            
137            this.memoryStreamMode = false;
138            
139            this.mustQuit = false;
140            
141            this.interval = -1;
142            this.remainingMillis = -1;
143            this.lastUpdateTime = 0;
144            this.sinceUpdateTime = 0;
145    }
146    
147    /**
148     * Sets the length of the interval between automatic data updates in milliseconds.
149     * If {@code interval <= 0} the update will not be triggered automatically.
150     * 
151     * @param interval The length of the interval between automatic data updates in milliseconds
152     * (if {@code interval <= 0} the update will not be triggered automatically).
153     */
154    private synchronized void setInterval(long interval) {
155            
156            if (interval == this.interval)
157                    return;
158            
159            if (this.interval <= 0 && interval > 0)
160                    this.lastUpdateTime = 0;
161            
162            this.interval = interval;
163            this.notifyAll();
164    }
165    
166    /**
167     * The length of the interval between data updates.
168     * If {@code interval <= 0} the update will not be triggered automatically.
169     * @return The length of the interval between automatic data updates in milliseconds;
170     * a value {@code interval <= 0} indicated that no updates will be triggered automatically.
171     */
172    public synchronized long getInterval() {
173            return interval;
174    }
175    
176    /**
177     * Used to notify this invoker that is must stop running at the next possibility.
178     * 
179     * @param val Whether this invoker should stop running at the next possibility.
180     */
181    public synchronized void setMustQuit(boolean val) {
182            this.mustQuit = val;
183            this.notifyAll();
184    }
185    
186    /**
187     * Time to next update.
188     * 
189     * @return Number of milliseconds left until the next update.
190     */
191    public synchronized long getRemainingMillis() {
192            return remainingMillis;
193    }
194    
195    private void resetCache(CacheMode mode) {
196            synchronized (dataCache) {
197                    if (null == mode)
198                            dataCache.resetCache();
199                    else
200                            dataCache.resetCache(mode);
201        }
202    }
203    
204    /**
205     * Sets the file from which the next update will be read and closes the previously used reader.
206     * 
207     * @param fileName File from which to read the data from now on.
208     * @throws IllegalStateException If no valid data cache is set.
209     */
210    private void setDataFile(String fileName) {
211            
212            if (null == fileName)
213                    throw new NullPointerException("Cannot read data from a null file name");
214            
215            File file = new File(fileName);
216                    
217            closeDataReader();
218            dataFile = file;
219    }
220    
221    
222    private synchronized void closeDataReader() {
223    
224            if (null == dataReader)
225                    return;
226            
227            synchronized(dataReader) {
228                    try {
229                            dataReader.close();
230                    } catch(IOException e) {
231                            LiveGraph.application().guiManager().logErrorLn(ThrowableTools.stackTraceToString(e));
232                    } finally {
233                            dataReader = null;
234                    }
235            }
236    }
237    
238    /**
239     * Tells whether this invoker's reader's underlying data stream is ready to be read.
240     * 
241     * @return {@code true} if the next {@code readFromStream()} is guaranteed not to block for input,
242     * {@code false} otherwise. Note that returning {@code false} does not guarantee that the next read will block.
243     * @throws IOException If an I/O error occurs.
244     */
245    public synchronized boolean ready() throws IOException {
246            
247            if (null == dataReader)
248                    return false;
249            
250            try {
251                    synchronized (dataReader) {
252                            return dataReader.ready();
253                    }
254            } catch(NullPointerException e) {
255                    // This will happen when dataReader became null after the check. It's ok, just return.
256                    return false;
257            }
258    }
259    
260    private void raiseCannotInitiateUpdate(String message) {
261            Event<DataUpdateEvent> event = new Event<DataUpdateEvent>(this, DataUpdateEvent.class,
262                                                                                                                              DataUpdateEvent.UPDIN_CannotInitiateUpdate,
263                                                                                                                              null == message ? "" : message);
264            LiveGraph.application().eventManager().raiseEvent(event);
265    }
266    
267    /**
268     * Creates a new {@link DataStreamToCacheReader} for a stream on the currently set data file.
269     * Called by {@link #requestUpdate()} either when no data reader is available
270     * ({@code dataReader} is {@code null}), or if a data reader is available, but "do not cacha data"
271     * is activated.
272     * 
273     * @return {@code true} if a data reader was opened, {@code false} if a data reader could not be
274     * successfully opened (also raises an {@code UPDIN_CannotInitiateUpdate}-event).
275     */
276    private boolean openDataFileReader() {
277            
278            if (null == dataFile || 0 == dataFile.getPath().length()) {
279                    raiseCannotInitiateUpdate("No data input specified");
280                    return false;
281            }
282                    
283            closeDataReader();
284    
285            try {
286                    dataReader = new DataStreamReader(new FileInputStream(dataFile));
287            } catch(FileNotFoundException e) {
288                    raiseCannotInitiateUpdate("File not found: " + dataFile.getName());
289                    LiveGraph.application().guiManager().logErrorLn(e.getMessage());
290                    return false;
291            } catch (Exception e) {
292                    raiseCannotInitiateUpdate(e.getMessage());
293                    LiveGraph.application().guiManager().logErrorLn(e.getMessage());
294                    return false;
295            }
296            
297            return true;
298    }
299    
300    /**
301     * Raises a {@code UPDIN_InitiateUpdate}-event to notify the {@code DataStreamToCacheReader} that
302     * it is time for a data update.
303     */
304    public synchronized void requestUpdate() {
305            
306            lastUpdateTime = System.currentTimeMillis();
307            
308            // Reopen data reader:
309            if (null == dataReader || LiveGraph.application().getDataFileSettings().getDoNotCacheData()) {
310                    
311                    if (!openDataFileReader())
312                            return;
313            }
314    
315            try {
316                    synchronized (dataReader) {
317                        
318                            if (null == dataReader)
319                                    return;
320                            
321                            Event<DataUpdateEvent> event = new Event<DataUpdateEvent>(
322                                                                                                    this, DataUpdateEvent.class,
323                                                                                                    DataUpdateEvent.UPDIN_InitiateUpdate,
324                                                                                                    LiveGraph.application().getDataFileSettings().getDoNotCacheData(),
325                                                                                                    0L, Double.NaN,
326                                                                                                    dataReader);
327                            LiveGraph.application().eventManager().raiseEvent(event);
328                    }
329            } catch(NullPointerException e) {
330                    // This will happen when dataReader became null after the check or when it could not be created.
331                    // It's ok, just return.
332            }
333    }
334    
335    /**
336     * Send the this invoker to sleep for {@code timeTickLength} milliseconds.
337     * When it wakes it, internal time state is updated an the observers notified.
338     */
339    private void timeTick() {
340            
341            synchronized(this) {
342                    
343                    try {
344                            if (interval < 0)
345                                    this.wait();
346                            else
347                                    this.wait(timeTickLength);
348                    } catch(InterruptedException e) { }
349                    
350                    sinceUpdateTime = System.currentTimeMillis() - lastUpdateTime;
351                    remainingMillis = interval <= 0 ? -1 : Math.max(0, interval - sinceUpdateTime);
352            }
353            
354            raiseTimerTick();
355    }
356    
357    /**
358     * Raises an event to notify listeners that this invoker has waken up to process events.
359     * This gives listeners displaying various information about this invoker a chance to
360     * update their state.
361     */
362    private void raiseTimerTick() {
363            Event<DataUpdateEvent> event = new Event<DataUpdateEvent>(this, DataUpdateEvent.class,
364                                                                                                                                            DataUpdateEvent.UPDIN_TimerTick);
365            LiveGraph.application().eventManager().raiseEvent(event);
366    }
367    
368    
369    /**
370     * Winds up the operations by closing the current data reader.
371     */
372    private void tidyUp() {
373            closeDataReader();
374    }
375    
376    /**
377     * Main invoker loop:
378     * call {@link #timeTick()};
379     * if it is time for the next update, call {@link #requestUpdate()};
380     * call {@link #timeTick()} again and continue the loop until {@link #mustQuit} is set to true;
381     * call {@link #tidyUp()} before quitting. 
382     */
383    public void run() {
384                    
385            while (true) {
386                    
387                    try {
388                            synchronized (this) {
389                                    if (mustQuit) {
390                                            tidyUp();
391                                            return;
392                                    }
393                    }
394                            
395                            timeTick();
396                            
397                            if (sinceUpdateTime >= interval && interval > 0)
398                                    requestUpdate();
399                    } catch(Throwable e) {
400                            LiveGraph.application().guiManager().logErrorLn(ThrowableTools.stackTraceToString(e));
401                    }
402            }
403    }
404    
405    /**
406     * Uses a pipe buffer of 5 MB. To customise the buffer size, create your own streams and use
407     * {@link #startMemoryStreamMode(InputStream)} to initiate the memory stream mode.
408     * @return xxx
409     */
410    public DataStreamWriter startMemoryStreamMode() {
411            
412            try {
413                    PipedInputStream ins = new PipedInputStream(5 * 1024 * 1024); // 5 MB
414                    DataStreamWriter outw = DataStreamWriterFactory.createDataWriter(ins);
415                    if (!startMemoryStreamMode(ins))
416                            return null;
417                    return outw;
418            } catch(IOException e) {
419                    return null;
420            }
421            
422    }
423    
424    public boolean startMemoryStreamMode(InputStream in) throws IOException {
425            
426            // Check not null
427            if (null == in)
428                    throw new NullPointerException("Cannot use a null stream for memory stream mode");
429            
430            // Check stream class:
431            if (in instanceof java.io.PipedInputStream) {
432                    final String correctClassName = org.LiveGraph.dataFile.read.PipedInputStream.class.getCanonicalName();
433                    throw new IllegalArgumentException(
434                                            String.format("Streams of type java.io.PipedInputStream are incompatible with the memory"
435                                                                    + " stream mode. %nUse %1$s instead. %n"
436                                                                    + "For further info see the API reference for %1$s.",
437                                                                    correctClassName));
438            }
439            
440            // Check stream open:
441            in.available();
442            
443            // Validate switching to memory stream mode:
444            EventManager eventManager = LiveGraph.application().eventManager();
445            Event<DataUpdateEvent> startEvent = new Event<DataUpdateEvent>(this, DataUpdateEvent.class,
446                                                                                                                                       DataUpdateEvent.UPDIN_StartMemoryStreamMode);
447            try {
448                    if (!eventManager.validateEvent(startEvent))
449                            return false;
450            } catch(EventProcessingException e) {
451                    return false;
452            }
453            
454            // Memory stream is only possible in caching mode:
455            DataFileSettings dfs = LiveGraph.application().getDataFileSettings(); 
456            dfs.setDoNotCacheData(false);
457            if (dfs.getDoNotCacheData())
458                    return false;
459            
460            // Make sure all previous events are processed:
461            LiveGraph.application().eventManager().waitForEvents();
462            
463            synchronized (this) {
464            
465                    // Close old reader and open memory stream reader:
466                    closeDataReader();
467                    resetCache(null);
468                    dataReader = new DataStreamReader(in);
469                    memoryStreamMode = true;
470                    eventManager.raiseEvent(startEvent);
471                    return true;
472            }
473    }
474    
475    public synchronized boolean endMemoryStreamMode() {
476            
477            // Validate switching:
478            Event<DataUpdateEvent> endEvent = new Event<DataUpdateEvent>(this, DataUpdateEvent.class,
479                                                                                                                                     DataUpdateEvent.UPDIN_EndMemoryStreamMode);
480            try {
481                    if (!LiveGraph.application().eventManager().validateEvent(endEvent))
482                            return false;
483            } catch(EventProcessingException e) {
484                    return false;
485            }
486    
487            // Close the memory stream reader:
488            closeDataReader();
489            memoryStreamMode = false;
490            
491            // Set the data file and cache mode according to the current settings:
492            setDataFile(LiveGraph.application().getDataFileSettings().getDataFile());
493            
494            LiveGraph.application().eventManager().raiseEvent(endEvent);
495            return true;
496    }
497    
498    /**
499     * Permits to register as listener with the main LiveGraph event manager and
500     * only with the main LiveGraph event manager.
501     * 
502     * @param manager The {@code EventManager} for the registering attempt.
503     * @return {@code (LiveGraph.application().eventManager() == manager)}.
504     * @see EventListener#permissionRegisterWithEventManager(EventManager)
505     */
506    public boolean permissionRegisterWithEventManager(EventManager manager) {
507            return LiveGraph.application().eventManager() == manager;
508    }
509    
510    /**
511     * Does not permit any unregistering.
512     * 
513     * @param manager The {@code EventManager} for the registering attempt.
514     * @return {@code false}.
515     * @see EventListener#permissionUnregisterWithEventManager(EventManager)
516     */
517    public boolean permissionUnregisterWithEventManager(EventManager manager) {
518            return false;
519    }
520    
521    /**
522     * Does nothing.
523     * 
524     * @param manager The {@code EventManager} with which this {@code EventListener} is now registered.
525     * @see EventListener#completedRegisterWithEventManager(EventManager)
526     */
527    public void completedRegisterWithEventManager(EventManager manager) { }
528    
529    /**
530     * Does nothing.
531     * 
532     * @param manager The {@code EventManager} with which this {@code EventListener} is now unregistered.
533     * @see EventListener#completedUnregisterWithEventManager(EventManager)
534     */
535    public void completedUnregisterWithEventManager(EventManager manager) { }
536    
537    /**
538     * Does nothing.
539     * 
540     * @param event An event in which this {@code EventListener} may be interested.
541     * @return {@code false}.
542     * @see EventListener#checkEventInterest(Event)
543     */
544    public boolean checkEventInterest(Event<? extends EventType> event) {
545            return false;
546    }
547    
548    /**
549     * Validates (or not) settings event. When in "memory stream mode" this invoker will
550     * not valudate changing of cache mode (all-data/tail-only or cache/dont-cache) or changing
551     * the data file.
552     * 
553     * @param event The event to be validated.
554     * @param soFar Whether {@code event} has been successfuly validated by whichever {@code EventListener}s
555     * (if any) were invoked to validate {@code event} before this {@code EventListener}.
556     * 
557     * @return {@code false} if this invoker is in {@code memoryStreamMode} and if the event is of the types
558     * {@code [DFS_DataFile, DFS_DoNotCacheData, DFS_ShowOnlyTailData, DFS_Load]}.
559     * 
560     * @see EventListener#checkEventValid(Event, boolean)
561     */
562    public boolean checkEventValid(Event<? extends EventType> event, boolean soFar) {
563            
564            if (!memoryStreamMode)
565                    return true;
566            
567            EventType eventType = event.getType();
568            if (SettingsEvent.DFS_DataFile == eventType
569                            || SettingsEvent.DFS_DoNotCacheData == eventType
570                            || SettingsEvent.DFS_ShowOnlyTailData == eventType
571                            || SettingsEvent.DFS_Load == eventType) {
572                    
573                    synchronized(this) {
574                            if (memoryStreamMode)
575                                    return false;
576                    }
577            }
578            
579            return true;
580    }
581    
582    /**
583     * Processes events.
584     * 
585     * @param event Event to process.
586     */
587    public void eventRaised(Event<? extends EventType> event) {
588            
589            if (null == event)
590                    return;
591            
592            if (event.getDomain() == SettingsEvent.class) {
593                    processSettingsEvent(event.cast(SettingsEvent.class));
594            }
595    }
596    
597    /**
598     * When the application's settings change, this method is called in order
599     * to update the internal state accordingly.
600     * 
601     * @param event Describes the change event.
602     */
603    private void processSettingsEvent(Event<SettingsEvent> event) {
604    
605            switch(event.getType()) {
606                    
607                    case DFS_DataFile:                      
608                            synchronized(this) {
609                                    if (memoryStreamMode)  break;
610                                    setDataFile((String) event.getInfoObject());
611                                    resetCache(null);                       
612                                    requestUpdate();
613                            }
614                            break;
615                            
616                    case DFS_UpdateFrequency:
617                            setInterval(event.getInfoLong());
618                            break;
619                                                    
620                    case DFS_DoNotCacheData:                        
621                            synchronized(this) {
622                                    if (memoryStreamMode)  break;
623                                    closeDataReader();
624                                    if (!event.getInfoBoolean())
625                                            resetCache(null);
626                            }
627                            break;
628                            
629                    case DFS_ShowOnlyTailData:
630                            synchronized(this) {
631                                    if (memoryStreamMode)  break;
632                                    DataCache.CacheMode newMode = (event.getInfoBoolean()
633                                                                                                                    ? DataCache.CacheMode.CacheTailData
634                                                                                                                    : DataCache.CacheMode.CacheAllData);
635                                    DataCache.CacheMode oldMode = (dataCache.getCacheMode());
636                                    
637                                    if (newMode != oldMode) {
638                                            
639                                            closeDataReader();
640                                            resetCache(newMode);
641                                            requestUpdate();
642                                    }
643                            }
644                            break;
645                            
646                    case DFS_Load:
647                            synchronized(this) {
648                                    if (memoryStreamMode)  break;
649                                    DataFileSettings dfs = LiveGraph.application().getDataFileSettings();
650                                    setInterval(dfs.getUpdateFrequency());
651                                    setDataFile(dfs.getDataFile());
652                                    resetCache(dfs.getShowOnlyTailData()
653                                                                                            ? DataCache.CacheMode.CacheTailData
654                                                                                            : DataCache.CacheMode.CacheAllData);
655                                    requestUpdate();
656                            }
657                            break;
658                            
659                    default:
660                            break;
661            }
662    }
663    
664    /**
665     * Objects of this class do not handle {@code eventProcessingFinished} notifications.
666     * 
667     * @param event Ignored.
668     */
669    public void eventProcessingFinished(Event<? extends EventType> event) { }
670    
671    /**
672     * Objects of this class do not handle {@code eventProcessingException} notofications.
673     * 
674     * @param event Ignored. 
675     * @param exception Never actually thrown.
676     * @return {@code false}.
677     */
678    public boolean eventProcessingException(Event<? extends EventType> event, EventProcessingException exception) {
679            return false;
680    }
681    
682    }  // public class UpdateInvoker