001 package org.LiveGraph.dataCache; 002 003 import java.io.IOException; 004 import java.util.List; 005 006 import org.LiveGraph.LiveGraph; 007 import org.LiveGraph.dataFile.common.DataFormatException; 008 import org.LiveGraph.dataFile.read.DataStreamObserver; 009 import org.LiveGraph.dataFile.read.DataStreamReader; 010 import org.LiveGraph.events.Event; 011 import org.LiveGraph.events.EventListener; 012 import org.LiveGraph.events.EventManager; 013 import org.LiveGraph.events.EventProcessingException; 014 import org.LiveGraph.events.EventProducer; 015 import org.LiveGraph.events.EventType; 016 017 import com.softnetConsult.utils.exceptions.ThrowableTools; 018 019 import static org.LiveGraph.dataCache.DataUpdateEvent.*; 020 021 022 /** 023 * This reader will parse a data stream using {@link DataStreamReader} and store all 024 * information in a data cache for further processing by the application.<br /> 025 * <br /> 026 * See {@link org.LiveGraph.dataFile.write.DataStreamWriter} for the details of the data file format. 027 * 028 * <p style="font-size:smaller;">This product includes software developed by the 029 * <strong>LiveGraph</strong> project and its contributors.<br /> 030 * (<a href="http://www.live-graph.org" target="_blank">http://www.live-graph.org</a>)<br /> 031 * Copyright (c) 2007-2008 G. Paperin.<br /> 032 * All rights reserved. 033 * </p> 034 * <p style="font-size:smaller;">File: DataStreamToCacheReader.java</p> 035 * <p style="font-size:smaller;">Redistribution and use in source and binary forms, with or 036 * without modification, are permitted provided that the following terms and conditions are met: 037 * </p> 038 * <p style="font-size:smaller;">1. Redistributions of source code must retain the above 039 * acknowledgement of the LiveGraph project and its web-site, the above copyright notice, 040 * this list of conditions and the following disclaimer.<br /> 041 * 2. Redistributions in binary form must reproduce the above acknowledgement of the 042 * LiveGraph project and its web-site, the above copyright notice, this list of conditions 043 * and the following disclaimer in the documentation and/or other materials provided with 044 * the distribution.<br /> 045 * 3. All advertising materials mentioning features or use of this software or any derived 046 * software must display the following acknowledgement:<br /> 047 * <em>This product includes software developed by the LiveGraph project and its 048 * contributors.<br />(http://www.live-graph.org)</em><br /> 049 * 4. All advertising materials distributed in form of HTML pages or any other technology 050 * permitting active hyper-links that mention features or use of this software or any 051 * derived software must display the acknowledgment specified in condition 3 of this 052 * agreement, and in addition, include a visible and working hyper-link to the LiveGraph 053 * homepage (http://www.live-graph.org). 054 * </p> 055 * <p style="font-size:smaller;">THIS SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY 056 * OF ANY KIND, EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 057 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL 058 * THE AUTHORS, CONTRIBUTORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 059 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR 060 * IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 061 * </p> 062 * 063 * @author Greg Paperin (<a href="http://www.paperin.org" target="_blank">http://www.paperin.org</a>) 064 * @version {@value org.LiveGraph.LiveGraph#version} 065 */ 066 public class DataStreamToCacheReader implements DataStreamObserver, EventListener, EventProducer { 067 068 /** 069 * The maximum period of time the reader will wait to be able to do an update before canceling the update. 070 */ 071 public static final long maxWaitForUpdate = 500L; 072 073 /** 074 * Cache for storage of extracted data. 075 */ 076 private DataCache cache = null; 077 078 /** 079 * Whether an update is currently running. 080 */ 081 private boolean updateInProgress = false; 082 083 /** 084 * Data reader used for the previous update. 085 */ 086 private DataStreamReader previousReader = null; 087 088 /** 089 * Creates a data reader on the specified stream. 090 * 091 * @param cache The data cache into which to store the data. 092 */ 093 public DataStreamToCacheReader(DataCache cache) { 094 095 if (null == cache) 096 throw new NullPointerException("Cannot use a null cache."); 097 098 this.cache = cache; 099 this.previousReader = null; 100 this.updateInProgress = false; 101 } 102 103 104 /** 105 * Reads as many data lines from the underlying stream as there are available, parses the lines and 106 * stores the extracted information (if any) in this reader's data cache. 107 * 108 * @param reader Data source. 109 * @param closeAfterRead Whether to close reader after reading. 110 * @throws IOException If an I/O error occurs. 111 * @throws DataFormatException If the data stream contents do not conform with the expected data 112 * stream format. 113 * @see org.LiveGraph.dataFile.write.DataStreamWriter 114 * @see org.LiveGraph.dataFile.read.DataStreamReader 115 */ 116 private void readFromStream(DataStreamReader reader, boolean closeAfterRead) 117 throws IOException, DataFormatException { 118 synchronized (reader) { 119 120 synchronized (cache) { 121 cache.bulkOperationStart(); 122 try { 123 if (LiveGraph.application().getDataFileSettings().getDoNotCacheData()) 124 cache.resetCache(); 125 while(checkReaderOpen(reader) && reader.ready()) { 126 reader.readFromStream(DataCache.MAX_DELAYED_EVENTS + 1); 127 try { cache.wait(1); } 128 catch(InterruptedException e) { } 129 } 130 } finally { 131 cache.bulkOperationCompleted(); 132 } 133 } 134 135 if (closeAfterRead) 136 reader.close(); 137 } 138 } 139 140 /** 141 * Whether an update is currently running. 142 * @return Whether an update is currently running. 143 */ 144 public synchronized boolean isUpdateInProgress() { 145 return updateInProgress; 146 } 147 148 /** 149 * Sets the internal {@code updateInProgress} state. 150 * @param state The new state. 151 */ 152 private synchronized void setUpdateInProgress(boolean state) { 153 updateInProgress = state; 154 } 155 156 /** 157 * Checks whether the specified reader is still open by trying to execute an action on it. 158 * 159 * @param reader A data stream reader. 160 * @return {@code true} if the reader is not closed, {@code false} otherwise. 161 */ 162 private boolean checkReaderOpen(DataStreamReader reader) { 163 synchronized(reader) { 164 try { 165 reader.ready(); 166 return true; 167 } catch(IOException e) { 168 return false; 169 } 170 } 171 } 172 173 /** 174 * Used by {@code checkUpdateCanStart}: Verifies that an update may begin by first checking 175 * whether another update is not in progress and then requesting all event listeners to verify a 176 * {@code UPDIN_UpdateStart}-event. 177 * 178 * @return A readily validated event of type {@code DataUpdateEvent.UPDIN_UpdateStart} if an update may begin, 179 * {@code null} if the verification was not successfull. 180 */ 181 private Event<DataUpdateEvent> doCheckUpdateCanStart() { 182 183 // Check that another update is not in progress: 184 if (isUpdateInProgress()) 185 return null; 186 187 // Validate update: 188 EventManager eventManager = LiveGraph.application().eventManager(); 189 Event<DataUpdateEvent> event = new Event<DataUpdateEvent>(this, DataUpdateEvent.class, UPDIN_UpdateStart); 190 191 try { 192 if (eventManager.validateEvent(event)) 193 return event; 194 } catch (EventProcessingException e) { } 195 return null; 196 } 197 198 /** 199 * Verifies that an update may begin by first checking whether another update is 200 * not in progress and then requesting all event listeners to verify a 201 * {@code UPDIN_UpdateStart}-event. If the verification is not successful, the method 202 * pauses the current thread and then reattempts the verification. This may be repeated 203 * several times. If the verification was not successfull after {@code maxWaitForUpdate} 204 * milliseconds, it will be regarded as failed. 205 * This private method assumes that the calling method has synchronised on {@code this}. 206 * 207 * @param reader Data source for the update. 208 * 209 * @return A readily validated event of type {@code DataUpdateEvent.UPDIN_UpdateStart} if an update may begin, 210 * {@code null} otherwise. 211 */ 212 private Event<DataUpdateEvent> checkUpdateCanStart(DataStreamReader reader) { 213 214 Event<DataUpdateEvent> startEvent = doCheckUpdateCanStart(); 215 if (null != startEvent) 216 return startEvent; 217 218 long startedWaiting = System.currentTimeMillis(); 219 do { 220 221 // If the reader is not open any more, we do not need to wait and can fail-fast: 222 if (!checkReaderOpen(reader)) 223 return null; 224 225 try { this.wait(5); } 226 catch(InterruptedException e) { } 227 228 startEvent = doCheckUpdateCanStart(); 229 if (null != startEvent) 230 return startEvent; 231 } while(System.currentTimeMillis() - startedWaiting < maxWaitForUpdate); 232 233 return null; 234 } 235 236 /** 237 * Attempts to initiate a data update in a new thread. 238 * First this method verifies that no other update is already in progress, then it 239 * validates a {@code UPDIN_UpdateStart}-event against all listeners. If both 240 * succeeds, a new thread is started that will read the stream into the cache. 241 * The verification may be attempted several times, but no longer than 242 * {@code maxWaitForUpdate} milliseconds. 243 * 244 * @param reader Data source. 245 * @param closeAfterRead Whether to close reader after reading. 246 */ 247 private synchronized void startDataUpdate(final DataStreamReader reader, final boolean closeAfterRead) { 248 249 if (null == reader) 250 return; 251 252 Event<DataUpdateEvent> startEvent = checkUpdateCanStart(reader); 253 if (null == startEvent) 254 return; 255 256 setUpdateInProgress(true); 257 258 // Even if reader is closed during this operation it should not fail, no no need for synch: 259 if (previousReader != reader) { 260 reader.addObserver(this); 261 previousReader = reader; 262 } 263 264 try { 265 266 LiveGraph.application().eventManager().raiseEvent(startEvent); 267 268 Runnable updateWorker = new Runnable() { 269 public void run() { 270 try { 271 try { 272 readFromStream(reader, closeAfterRead); 273 } catch(Exception e) { 274 raiseUpdateFinishedError(e); 275 } 276 raiseUpdateFinishedSusccess(); 277 } finally { 278 setUpdateInProgress(false); 279 } 280 } 281 }; 282 283 Thread readerThread = new Thread(updateWorker, "LiveGraph Data File Reader"); 284 readerThread.start(); 285 286 // Make the best effort to reset the in-progress flag if an error occurs: 287 } catch(RuntimeException e) { 288 setUpdateInProgress(false); 289 throw e; 290 } catch(Error e) { 291 setUpdateInProgress(false); 292 throw e; 293 } 294 } 295 296 297 /** 298 * Raises an event to notify listeners that an update has finished without any errors. 299 */ 300 private void raiseUpdateFinishedSusccess() { 301 Event<DataUpdateEvent> event = new Event<DataUpdateEvent>(this, DataUpdateEvent.class, 302 DataUpdateEvent.UPDIN_UpdateFinishSuccess); 303 LiveGraph.application().eventManager().raiseEvent(event); 304 } 305 306 /** 307 * Raises an event to notify listeners that an update has finished with errors. 308 * 309 * @param err The {@code Throwable} describing the error. 310 */ 311 private void raiseUpdateFinishedError(Throwable err) { 312 313 String shortMsg = err.getMessage() + "(" + err.getClass().getSimpleName() + ")"; 314 String longMsg = ThrowableTools.stackTraceToString(err); 315 316 Event<DataUpdateEvent> event = new Event<DataUpdateEvent>(this, DataUpdateEvent.class, 317 DataUpdateEvent.UPDIN_UpdateFinishError, 318 shortMsg); 319 LiveGraph.application().eventManager().raiseEvent(event); 320 LiveGraph.application().guiManager().logErrorLn(longMsg); 321 } 322 323 324 /** 325 * Used for callback by the {@link DataStreamReader}; does nothing. 326 */ 327 public void eventCommentLine(String line, DataStreamReader reader) { 328 ; // No action required. 329 } 330 331 /** 332 * Used for callback by the {@link DataStreamReader}; adds a dataset to the cache. 333 */ 334 public void eventDataLineRead(List<String> dataTokens, int datasetIndex, DataStreamReader reader) { 335 336 List<Double> vals = DataStreamReader.convertTokensToDoubles(dataTokens); 337 DataSet ds = new DataSet(vals, datasetIndex); 338 cache.addDataSet(ds); 339 } 340 341 /** 342 * Used for callback by the {@link DataStreamReader}; adds a file info line to the cache. 343 */ 344 public void eventFileInfoLine(String info, DataStreamReader reader) { 345 cache.addDataFileInfo(info); 346 } 347 348 /** 349 * Used for callback by the {@link DataStreamReader}; setts the data column labels in the cache. 350 */ 351 public void eventLabelsSet(List<String> labels, DataStreamReader reader) { 352 List<String> uniqueLabels = DataStreamReader.createUniqueLabels(labels, true); 353 cache.resetLabels(uniqueLabels); 354 } 355 356 /** 357 * Used for callback by the {@link DataStreamReader}; does nothing. 358 */ 359 public void eventSeparatorSet(String separator, DataStreamReader reader) { 360 ; // No action required. 361 } 362 363 364 /** 365 * Permits to register as listener with the main LiveGraph event manager and 366 * only with the main LiveGraph event manager. 367 * 368 * @param manager The {@code EventManager} for the registering attempt. 369 * @return {@code (LiveGraph.application().eventManager() == manager)}. 370 * @see EventListener#permissionRegisterWithEventManager(EventManager) 371 */ 372 public boolean permissionRegisterWithEventManager(EventManager manager) { 373 return LiveGraph.application().eventManager() == manager; 374 } 375 376 /** 377 * Does not permit any unregistering. 378 * 379 * @param manager The {@code EventManager} for the registering attempt. 380 * @return {@code false}. 381 * @see EventListener#permissionUnregisterWithEventManager(EventManager) 382 */ 383 public boolean permissionUnregisterWithEventManager(EventManager manager) { 384 return false; 385 } 386 387 /** 388 * Does nothing. 389 * 390 * @param manager The {@code EventManager} with which this {@code EventListener} is now registered. 391 * @see EventListener#completedRegisterWithEventManager(EventManager) 392 */ 393 public void completedRegisterWithEventManager(EventManager manager) { } 394 395 /** 396 * Does nothing. 397 * 398 * @param manager The {@code EventManager} with which this {@code EventListener} is now unregistered. 399 * @see EventListener#completedUnregisterWithEventManager(EventManager) 400 */ 401 public void completedUnregisterWithEventManager(EventManager manager) { } 402 403 /** 404 * Does nothing. 405 * 406 * @param event An event in which this {@code EventListener} may be interested. 407 * @return {@code false}. 408 * @see EventListener#checkEventInterest(Event) 409 */ 410 public boolean checkEventInterest(Event<? extends EventType> event) { 411 return false; 412 } 413 414 /** 415 * Does nothing. 416 * 417 * @param event The event to be validated. 418 * @param soFar Whether {@code event} has been successfuly validated by whichever {@code EventListener}s 419 * (if any) were invoked to validate {@code event} before this {@code EventListener}. 420 * @return {@code true}. 421 * @see EventListener#checkEventValid(Event, boolean) 422 */ 423 public boolean checkEventValid(Event<? extends EventType> event, boolean soFar) { 424 return true; 425 } 426 427 /** 428 * Processes LiveGraph events. 429 * 430 * @param event The event to process. 431 */ 432 public void eventRaised(Event<? extends EventType> event) throws Exception { 433 434 if (event.getDomain() == DataUpdateEvent.class) { 435 processDataUpdateEvent(event.cast(DataUpdateEvent.class)); 436 return; 437 } 438 } 439 440 /** 441 * Attempts to initiate an update when a {@code UPDIN_InitiateUpdate}-event is received. 442 * 443 * @param event The event to process. 444 */ 445 private void processDataUpdateEvent(Event<DataUpdateEvent> event) { 446 447 if (UPDIN_InitiateUpdate == event.getType()) { 448 startDataUpdate((DataStreamReader) event.getInfoObject(), event.getInfoBoolean()); 449 return; 450 } 451 452 } 453 454 /** 455 * Does nothing. 456 * @param event Event that cause an exception. 457 * @param exception The exception. 458 * @return {@code false}. 459 */ 460 public boolean eventProcessingException(Event<? extends EventType> event, EventProcessingException exception) { 461 return false; 462 } 463 464 /** 465 * Does nothing. 466 * @param event Event that was processed. 467 */ 468 public void eventProcessingFinished(Event<? extends EventType> event) { } 469 470 } // public class DataStreamToCacheReader