001    package org.LiveGraph.dataCache;
002    
003    import java.io.IOException;
004    import java.util.List;
005    
006    import org.LiveGraph.LiveGraph;
007    import org.LiveGraph.dataFile.common.DataFormatException;
008    import org.LiveGraph.dataFile.read.DataStreamObserver;
009    import org.LiveGraph.dataFile.read.DataStreamReader;
010    import org.LiveGraph.events.Event;
011    import org.LiveGraph.events.EventListener;
012    import org.LiveGraph.events.EventManager;
013    import org.LiveGraph.events.EventProcessingException;
014    import org.LiveGraph.events.EventProducer;
015    import org.LiveGraph.events.EventType;
016    
017    import com.softnetConsult.utils.exceptions.ThrowableTools;
018    
019    import static org.LiveGraph.dataCache.DataUpdateEvent.*;
020    
021    
022    /**
023     * This reader will parse a data stream using {@link DataStreamReader} and store all
024     * information in a data cache for further processing by the application.<br />
025     * <br />
026     * See {@link org.LiveGraph.dataFile.write.DataStreamWriter} for the details of the data file format. 
027     * 
028     * <p style="font-size:smaller;">This product includes software developed by the
029     *    <strong>LiveGraph</strong> project and its contributors.<br />
030     *    (<a href="http://www.live-graph.org" target="_blank">http://www.live-graph.org</a>)<br />
031     *    Copyright (c) 2007-2008 G. Paperin.<br />
032     *    All rights reserved.
033     * </p>
034     * <p style="font-size:smaller;">File: DataStreamToCacheReader.java</p> 
035     * <p style="font-size:smaller;">Redistribution and use in source and binary forms, with or
036     *    without modification, are permitted provided that the following terms and conditions are met:
037     * </p>
038     * <p style="font-size:smaller;">1. Redistributions of source code must retain the above
039     *    acknowledgement of the LiveGraph project and its web-site, the above copyright notice,
040     *    this list of conditions and the following disclaimer.<br />
041     *    2. Redistributions in binary form must reproduce the above acknowledgement of the
042     *    LiveGraph project and its web-site, the above copyright notice, this list of conditions
043     *    and the following disclaimer in the documentation and/or other materials provided with
044     *    the distribution.<br />
045     *    3. All advertising materials mentioning features or use of this software or any derived
046     *    software must display the following acknowledgement:<br />
047     *    <em>This product includes software developed by the LiveGraph project and its
048     *    contributors.<br />(http://www.live-graph.org)</em><br />
049     *    4. All advertising materials distributed in form of HTML pages or any other technology
050     *    permitting active hyper-links that mention features or use of this software or any
051     *    derived software must display the acknowledgment specified in condition 3 of this
052     *    agreement, and in addition, include a visible and working hyper-link to the LiveGraph
053     *    homepage (http://www.live-graph.org).
054     * </p>
055     * <p style="font-size:smaller;">THIS SOFTWARE IS PROVIDED &quot;AS IS&quot;, WITHOUT WARRANTY
056     *    OF ANY KIND, EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
057     *    MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND  NONINFRINGEMENT. IN NO EVENT SHALL
058     *    THE AUTHORS, CONTRIBUTORS OR COPYRIGHT  HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
059     *    LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING  FROM, OUT OF OR
060     *    IN CONNECTION WITH THE SOFTWARE OR THE USE OR  OTHER DEALINGS IN THE SOFTWARE.
061     * </p>
062     * 
063     * @author Greg Paperin (<a href="http://www.paperin.org" target="_blank">http://www.paperin.org</a>)
064     * @version {@value org.LiveGraph.LiveGraph#version}
065     */
066    public class DataStreamToCacheReader implements DataStreamObserver, EventListener, EventProducer {
067    
068    /**
069     * The maximum period of time the reader will wait to be able to do an update before canceling the update.
070     */
071    public static final long maxWaitForUpdate = 500L;
072    
073    /**
074     * Cache for storage of extracted data.
075     */
076    private DataCache cache = null;
077    
078    /**
079     * Whether an update is currently running. 
080     */
081    private boolean updateInProgress = false;
082    
083    /**
084     * Data reader used for the previous update.
085     */
086    private DataStreamReader previousReader = null;
087    
088    /**
089     * Creates a data reader on the specified stream.
090     * 
091     * @param cache The data cache into which to store the data. 
092     */
093    public DataStreamToCacheReader(DataCache cache) {
094            
095            if (null == cache)
096                    throw new NullPointerException("Cannot use a null cache.");
097            
098            this.cache = cache;
099            this.previousReader = null;
100            this.updateInProgress = false;
101    }
102    
103    
104    /**
105     * Reads as many data lines from the underlying stream as there are available, parses the lines and
106     * stores the extracted information (if any) in this reader's data cache.
107     *  
108     * @param reader Data source.
109     * @param closeAfterRead Whether to close reader after reading. 
110     * @throws IOException If an I/O error occurs.
111     * @throws DataFormatException If the data stream contents do not conform with the expected data
112     * stream format.
113     * @see org.LiveGraph.dataFile.write.DataStreamWriter
114     * @see org.LiveGraph.dataFile.read.DataStreamReader
115     */
116    private void readFromStream(DataStreamReader reader, boolean closeAfterRead)
117                                                                                                                                    throws IOException, DataFormatException {
118            synchronized (reader) {
119                    
120                    synchronized (cache) {
121                            cache.bulkOperationStart();
122                            try {
123                                    if (LiveGraph.application().getDataFileSettings().getDoNotCacheData())
124                                            cache.resetCache();
125                                    while(checkReaderOpen(reader) && reader.ready()) {
126                                            reader.readFromStream(DataCache.MAX_DELAYED_EVENTS + 1);
127                                            try { cache.wait(1); }
128                                            catch(InterruptedException e) { }
129                                    }
130                            } finally {
131                                    cache.bulkOperationCompleted();
132                            }
133                    }
134                    
135                    if (closeAfterRead)
136                            reader.close();
137            }
138    }
139    
140    /**
141     * Whether an update is currently running.
142     * @return Whether an update is currently running.
143     */
144    public synchronized boolean isUpdateInProgress() {
145            return updateInProgress;
146    }
147    
148    /**
149     * Sets the internal {@code updateInProgress} state.
150     * @param state The new state.
151     */
152    private synchronized void setUpdateInProgress(boolean state) {
153            updateInProgress = state;
154    }
155    
156    /**
157     * Checks whether the specified reader is still open by trying to execute an action on it.
158     * 
159     * @param reader A data stream reader.
160     * @return {@code true} if the reader is not closed, {@code false} otherwise.
161     */
162    private boolean checkReaderOpen(DataStreamReader reader) {
163            synchronized(reader) {
164                try {
165                    reader.ready();
166                    return true;
167                } catch(IOException e) {
168                    return false;
169                }
170        }
171    }
172    
173    /**
174     * Used by {@code checkUpdateCanStart}: Verifies that an update may begin by first checking
175     * whether another update is not in progress and then requesting all event listeners to verify a
176     * {@code UPDIN_UpdateStart}-event.
177     * 
178     * @return A readily validated event of type {@code DataUpdateEvent.UPDIN_UpdateStart} if an update may begin,
179     * {@code null} if the verification was not successfull.
180     */
181    private Event<DataUpdateEvent> doCheckUpdateCanStart() {
182            
183            // Check that another update is not in progress:
184            if (isUpdateInProgress())
185                    return null;
186            
187            // Validate update:
188            EventManager eventManager = LiveGraph.application().eventManager();
189            Event<DataUpdateEvent> event = new Event<DataUpdateEvent>(this, DataUpdateEvent.class, UPDIN_UpdateStart);
190            
191            try {
192                    if (eventManager.validateEvent(event))
193                            return event;
194            } catch (EventProcessingException e) { }
195            return null;
196    }
197    
198    /**
199     * Verifies that an update may begin by first checking whether another update is
200     * not in progress and then requesting all event listeners to verify a
201     * {@code UPDIN_UpdateStart}-event. If the verification is not successful, the method
202     * pauses the current thread and then reattempts the verification. This may be repeated
203     * several times. If the verification was not successfull after {@code maxWaitForUpdate}
204     * milliseconds, it will be regarded as failed. 
205     * This private method assumes that the calling method has synchronised on {@code this}.
206     * 
207     * @param reader Data source for the update.
208     * 
209     * @return A readily validated event of type {@code DataUpdateEvent.UPDIN_UpdateStart} if an update may begin,
210     * {@code null} otherwise.
211     */
212    private Event<DataUpdateEvent> checkUpdateCanStart(DataStreamReader reader) {
213            
214            Event<DataUpdateEvent> startEvent = doCheckUpdateCanStart();
215            if (null != startEvent)
216                    return startEvent;
217            
218            long startedWaiting = System.currentTimeMillis();
219            do {
220                    
221                    // If the reader is not open any more, we do not need to wait and can fail-fast:
222                    if (!checkReaderOpen(reader))
223                            return null;
224                    
225                    try { this.wait(5); }
226                    catch(InterruptedException e) { }
227                    
228                    startEvent = doCheckUpdateCanStart();
229                    if (null != startEvent)
230                            return startEvent;
231            } while(System.currentTimeMillis() - startedWaiting < maxWaitForUpdate);
232            
233            return null;
234    }
235    
236    /**
237     * Attempts to initiate a data update in a new thread.
238     * First this method verifies that no other update is already in progress, then it
239     * validates a {@code UPDIN_UpdateStart}-event against all listeners. If both
240     * succeeds, a new thread is started that will read the stream into the cache.
241     * The verification may be attempted several times, but no longer than
242     * {@code maxWaitForUpdate} milliseconds.
243     * 
244     * @param reader Data source.
245     * @param closeAfterRead Whether to close reader after reading. 
246     */
247    private synchronized void startDataUpdate(final DataStreamReader reader, final boolean closeAfterRead) {
248            
249            if (null == reader)
250                    return;
251            
252            Event<DataUpdateEvent> startEvent = checkUpdateCanStart(reader);
253            if (null == startEvent)
254                    return;
255                    
256            setUpdateInProgress(true);
257            
258            // Even if reader is closed during this operation it should not fail, no no need for synch:
259            if (previousReader != reader) {
260                    reader.addObserver(this);
261                    previousReader = reader;
262            }
263            
264            try {
265            
266                    LiveGraph.application().eventManager().raiseEvent(startEvent);
267                    
268                    Runnable updateWorker = new Runnable() {
269                            public void run() {
270                                    try {
271                                            try {
272                                                    readFromStream(reader, closeAfterRead);                         
273                                            } catch(Exception e) {                          
274                                                    raiseUpdateFinishedError(e);
275                                            }
276                                            raiseUpdateFinishedSusccess();
277                                    } finally {
278                                            setUpdateInProgress(false);
279                                    }
280                            }
281                    };
282                    
283                    Thread readerThread = new Thread(updateWorker, "LiveGraph Data File Reader");
284                    readerThread.start();
285            
286            // Make the best effort to reset the in-progress flag if an error occurs:
287            } catch(RuntimeException e) {
288                    setUpdateInProgress(false);
289                    throw e;
290            } catch(Error e) {
291                    setUpdateInProgress(false);
292                    throw e;
293            }
294    }
295    
296    
297    /**
298     * Raises an event to notify listeners that an update has finished without any errors.
299     */
300    private void raiseUpdateFinishedSusccess() {
301            Event<DataUpdateEvent> event = new Event<DataUpdateEvent>(this, DataUpdateEvent.class,
302                                                                                                                                    DataUpdateEvent.UPDIN_UpdateFinishSuccess);
303            LiveGraph.application().eventManager().raiseEvent(event);
304    }
305    
306    /**
307     * Raises an event to notify listeners that an update has finished with errors.
308     *  
309     * @param err The {@code Throwable} describing the error.
310     */
311    private void raiseUpdateFinishedError(Throwable err) {
312            
313            String shortMsg = err.getMessage() + "(" + err.getClass().getSimpleName() + ")";
314            String longMsg = ThrowableTools.stackTraceToString(err);
315            
316            Event<DataUpdateEvent> event = new Event<DataUpdateEvent>(this, DataUpdateEvent.class,
317                                                                                                                              DataUpdateEvent.UPDIN_UpdateFinishError,
318                                                                                                                              shortMsg);
319            LiveGraph.application().eventManager().raiseEvent(event);
320            LiveGraph.application().guiManager().logErrorLn(longMsg);
321    }
322    
323    
324    /**
325     * Used for callback by the {@link DataStreamReader}; does nothing.
326     */
327    public void eventCommentLine(String line, DataStreamReader reader) {
328            ; // No action required.        
329    }
330    
331    /**
332     * Used for callback by the {@link DataStreamReader}; adds a dataset to the cache.
333     */
334    public void eventDataLineRead(List<String> dataTokens, int datasetIndex, DataStreamReader reader) {
335            
336            List<Double> vals = DataStreamReader.convertTokensToDoubles(dataTokens);
337            DataSet ds = new DataSet(vals, datasetIndex);
338            cache.addDataSet(ds);   
339    }
340    
341    /**
342     * Used for callback by the {@link DataStreamReader}; adds a file info line to the cache.
343     */
344    public void eventFileInfoLine(String info, DataStreamReader reader) {
345            cache.addDataFileInfo(info);    
346    }
347    
348    /**
349     * Used for callback by the {@link DataStreamReader}; setts the data column labels in the cache.
350     */
351    public void eventLabelsSet(List<String> labels, DataStreamReader reader) {
352            List<String> uniqueLabels = DataStreamReader.createUniqueLabels(labels, true); 
353            cache.resetLabels(uniqueLabels);        
354    }
355    
356    /**
357     * Used for callback by the {@link DataStreamReader}; does nothing.
358     */
359    public void eventSeparatorSet(String separator, DataStreamReader reader) {
360            ; // No action required.        
361    }
362    
363    
364    /**
365     * Permits to register as listener with the main LiveGraph event manager and
366     * only with the main LiveGraph event manager.
367     * 
368     * @param manager The {@code EventManager} for the registering attempt.
369     * @return {@code (LiveGraph.application().eventManager() == manager)}.
370     * @see EventListener#permissionRegisterWithEventManager(EventManager)
371     */
372    public boolean permissionRegisterWithEventManager(EventManager manager) {
373            return LiveGraph.application().eventManager() == manager;
374    }
375    
376    /**
377     * Does not permit any unregistering.
378     * 
379     * @param manager The {@code EventManager} for the registering attempt.
380     * @return {@code false}.
381     * @see EventListener#permissionUnregisterWithEventManager(EventManager)
382     */
383    public boolean permissionUnregisterWithEventManager(EventManager manager) {
384            return false;
385    }
386    
387    /**
388     * Does nothing.
389     * 
390     * @param manager The {@code EventManager} with which this {@code EventListener} is now registered.
391     * @see EventListener#completedRegisterWithEventManager(EventManager)
392     */
393    public void completedRegisterWithEventManager(EventManager manager) { }
394    
395    /**
396     * Does nothing.
397     * 
398     * @param manager The {@code EventManager} with which this {@code EventListener} is now unregistered.
399     * @see EventListener#completedUnregisterWithEventManager(EventManager)
400     */
401    public void completedUnregisterWithEventManager(EventManager manager) { }
402    
403    /**
404     * Does nothing.
405     * 
406     * @param event An event in which this {@code EventListener} may be interested.
407     * @return {@code false}.
408     * @see EventListener#checkEventInterest(Event)
409     */
410    public boolean checkEventInterest(Event<? extends EventType> event) {
411            return false;
412    }
413    
414    /**
415     * Does nothing.
416     * 
417     * @param event The event to be validated.
418     * @param soFar Whether {@code event} has been successfuly validated by whichever {@code EventListener}s
419     * (if any) were invoked to validate {@code event} before this {@code EventListener}.
420     * @return {@code true}.
421     * @see EventListener#checkEventValid(Event, boolean)
422     */
423    public boolean checkEventValid(Event<? extends EventType> event, boolean soFar) {
424            return true;
425    }
426    
427    /**
428     * Processes LiveGraph events.
429     * 
430     * @param event The event to process.
431     */
432    public void eventRaised(Event<? extends EventType> event) throws Exception {
433    
434            if (event.getDomain() == DataUpdateEvent.class) {
435                    processDataUpdateEvent(event.cast(DataUpdateEvent.class));
436                    return;
437            }
438    }
439    
440    /**
441     * Attempts to initiate an update when a {@code UPDIN_InitiateUpdate}-event is received.
442     * 
443     * @param event The event to process.
444     */
445    private void processDataUpdateEvent(Event<DataUpdateEvent> event) {
446            
447            if (UPDIN_InitiateUpdate == event.getType()) {
448                    startDataUpdate((DataStreamReader) event.getInfoObject(), event.getInfoBoolean());
449                    return;
450            }
451            
452    }
453    
454    /**
455     * Does nothing.
456     * @param event Event that cause an exception.
457     * @param exception The exception.
458     * @return {@code false}.
459     */
460    public boolean eventProcessingException(Event<? extends EventType> event, EventProcessingException exception) {
461            return false;
462    }
463    
464    /**
465     * Does nothing.
466     *  @param event Event that was processed.
467     */
468    public void eventProcessingFinished(Event<? extends EventType> event) {   }
469    
470    }  // public class DataStreamToCacheReader