001 package org.LiveGraph.dataCache; 002 003 import java.io.File; 004 import java.io.FileInputStream; 005 import java.io.FileNotFoundException; 006 import java.io.IOException; 007 import java.io.InputStream; 008 009 import org.LiveGraph.LiveGraph; 010 import org.LiveGraph.dataCache.DataCache.CacheMode; 011 import org.LiveGraph.dataFile.read.DataStreamReader; 012 import org.LiveGraph.dataFile.read.PipedInputStream; 013 import org.LiveGraph.dataFile.write.DataStreamWriter; 014 import org.LiveGraph.dataFile.write.DataStreamWriterFactory; 015 import org.LiveGraph.events.Event; 016 import org.LiveGraph.events.EventListener; 017 import org.LiveGraph.events.EventManager; 018 import org.LiveGraph.events.EventProcessingException; 019 import org.LiveGraph.events.EventProducer; 020 import org.LiveGraph.events.EventType; 021 import org.LiveGraph.settings.DataFileSettings; 022 import org.LiveGraph.settings.SettingsEvent; 023 024 import com.softnetConsult.utils.exceptions.ThrowableTools; 025 026 027 /** 028 * An object of this class is used to triger updates from a data input stream 029 * into a {@link DataCache} at regular intervals. 030 * 031 * <p style="font-size:smaller;">This product includes software developed by the 032 * <strong>LiveGraph</strong> project and its contributors.<br /> 033 * (<a href="http://www.live-graph.org" target="_blank">http://www.live-graph.org</a>)<br /> 034 * Copyright (c) 2007-2008 G. Paperin.<br /> 035 * All rights reserved. 036 * </p> 037 * <p style="font-size:smaller;">File: UpdateInvoker.java</p> 038 * <p style="font-size:smaller;">Redistribution and use in source and binary forms, with or 039 * without modification, are permitted provided that the following terms and conditions are met: 040 * </p> 041 * <p style="font-size:smaller;">1. Redistributions of source code must retain the above 042 * acknowledgement of the LiveGraph project and its web-site, the above copyright notice, 043 * this list of conditions and the following disclaimer.<br /> 044 * 2. Redistributions in binary form must reproduce the above acknowledgement of the 045 * LiveGraph project and its web-site, the above copyright notice, this list of conditions 046 * and the following disclaimer in the documentation and/or other materials provided with 047 * the distribution.<br /> 048 * 3. All advertising materials mentioning features or use of this software or any derived 049 * software must display the following acknowledgement:<br /> 050 * <em>This product includes software developed by the LiveGraph project and its 051 * contributors.<br />(http://www.live-graph.org)</em><br /> 052 * 4. All advertising materials distributed in form of HTML pages or any other technology 053 * permitting active hyper-links that mention features or use of this software or any 054 * derived software must display the acknowledgment specified in condition 3 of this 055 * agreement, and in addition, include a visible and working hyper-link to the LiveGraph 056 * homepage (http://www.live-graph.org). 057 * </p> 058 * <p style="font-size:smaller;">THIS SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY 059 * OF ANY KIND, EXPRESSED OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF 060 * MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL 061 * THE AUTHORS, CONTRIBUTORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 062 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR 063 * IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. 064 * </p> 065 * 066 * @author Greg Paperin (<a href="http://www.paperin.org" target="_blank">http://www.paperin.org</a>) 067 * @version {@value org.LiveGraph.LiveGraph#version} 068 */ 069 public class UpdateInvoker implements Runnable, EventListener, EventProducer { 070 071 /** 072 * How long to sleep for when updates are to be invoked automatically. 073 */ 074 private static final long timeTickLength = 50; // milliseconds 075 076 /** 077 * Determines wether the invoker is in memory-stream mode. 078 * @see #startMemoryStreamMode(InputStream) 079 */ 080 private boolean memoryStreamMode = false; 081 082 /** 083 * The data reader for the input stream. 084 */ 085 private DataStreamReader dataReader = null; 086 087 /** 088 * Cache to hold the data. 089 */ 090 private DataCache dataCache = null; 091 092 /** 093 * Data file from which to update. 094 */ 095 private File dataFile = null; 096 097 /** 098 * Interval between updates in milliseconds. 099 */ 100 private long interval = -1; 101 102 /** 103 * Whether the invoker thread should wind up at the next possibility. 104 */ 105 private boolean mustQuit = false; 106 107 /** 108 * Remaining milliseconds till the next update. 109 */ 110 private long remainingMillis = -1; 111 112 /** 113 * System milliseconds at last update. 114 */ 115 private long lastUpdateTime = 0; 116 117 /** 118 * Milliseconds since last update. 119 */ 120 private long sinceUpdateTime = 0; 121 122 123 /** 124 * Constructs a new invoker. 125 * @param cache The application's data cache. 126 */ 127 public UpdateInvoker(DataCache cache) { 128 129 if (null == cache) 130 throw new NullPointerException("Cannot read data into a null cache"); 131 132 this.dataCache = cache; 133 134 this.dataFile = null; 135 this.dataReader = null; 136 137 this.memoryStreamMode = false; 138 139 this.mustQuit = false; 140 141 this.interval = -1; 142 this.remainingMillis = -1; 143 this.lastUpdateTime = 0; 144 this.sinceUpdateTime = 0; 145 } 146 147 /** 148 * Sets the length of the interval between automatic data updates in milliseconds. 149 * If {@code interval <= 0} the update will not be triggered automatically. 150 * 151 * @param interval The length of the interval between automatic data updates in milliseconds 152 * (if {@code interval <= 0} the update will not be triggered automatically). 153 */ 154 private synchronized void setInterval(long interval) { 155 156 if (interval == this.interval) 157 return; 158 159 if (this.interval <= 0 && interval > 0) 160 this.lastUpdateTime = 0; 161 162 this.interval = interval; 163 this.notifyAll(); 164 } 165 166 /** 167 * The length of the interval between data updates. 168 * If {@code interval <= 0} the update will not be triggered automatically. 169 * @return The length of the interval between automatic data updates in milliseconds; 170 * a value {@code interval <= 0} indicated that no updates will be triggered automatically. 171 */ 172 public synchronized long getInterval() { 173 return interval; 174 } 175 176 /** 177 * Used to notify this invoker that is must stop running at the next possibility. 178 * 179 * @param val Whether this invoker should stop running at the next possibility. 180 */ 181 public synchronized void setMustQuit(boolean val) { 182 this.mustQuit = val; 183 this.notifyAll(); 184 } 185 186 /** 187 * Time to next update. 188 * 189 * @return Number of milliseconds left until the next update. 190 */ 191 public synchronized long getRemainingMillis() { 192 return remainingMillis; 193 } 194 195 private void resetCache(CacheMode mode) { 196 synchronized (dataCache) { 197 if (null == mode) 198 dataCache.resetCache(); 199 else 200 dataCache.resetCache(mode); 201 } 202 } 203 204 /** 205 * Sets the file from which the next update will be read and closes the previously used reader. 206 * 207 * @param fileName File from which to read the data from now on. 208 * @throws IllegalStateException If no valid data cache is set. 209 */ 210 private void setDataFile(String fileName) { 211 212 if (null == fileName) 213 throw new NullPointerException("Cannot read data from a null file name"); 214 215 File file = new File(fileName); 216 217 closeDataReader(); 218 dataFile = file; 219 } 220 221 222 private synchronized void closeDataReader() { 223 224 if (null == dataReader) 225 return; 226 227 synchronized(dataReader) { 228 try { 229 dataReader.close(); 230 } catch(IOException e) { 231 LiveGraph.application().guiManager().logErrorLn(ThrowableTools.stackTraceToString(e)); 232 } finally { 233 dataReader = null; 234 } 235 } 236 } 237 238 /** 239 * Tells whether this invoker's reader's underlying data stream is ready to be read. 240 * 241 * @return {@code true} if the next {@code readFromStream()} is guaranteed not to block for input, 242 * {@code false} otherwise. Note that returning {@code false} does not guarantee that the next read will block. 243 * @throws IOException If an I/O error occurs. 244 */ 245 public synchronized boolean ready() throws IOException { 246 247 if (null == dataReader) 248 return false; 249 250 try { 251 synchronized (dataReader) { 252 return dataReader.ready(); 253 } 254 } catch(NullPointerException e) { 255 // This will happen when dataReader became null after the check. It's ok, just return. 256 return false; 257 } 258 } 259 260 private void raiseCannotInitiateUpdate(String message) { 261 Event<DataUpdateEvent> event = new Event<DataUpdateEvent>(this, DataUpdateEvent.class, 262 DataUpdateEvent.UPDIN_CannotInitiateUpdate, 263 null == message ? "" : message); 264 LiveGraph.application().eventManager().raiseEvent(event); 265 } 266 267 /** 268 * Creates a new {@link DataStreamToCacheReader} for a stream on the currently set data file. 269 * Called by {@link #requestUpdate()} either when no data reader is available 270 * ({@code dataReader} is {@code null}), or if a data reader is available, but "do not cacha data" 271 * is activated. 272 * 273 * @return {@code true} if a data reader was opened, {@code false} if a data reader could not be 274 * successfully opened (also raises an {@code UPDIN_CannotInitiateUpdate}-event). 275 */ 276 private boolean openDataFileReader() { 277 278 if (null == dataFile || 0 == dataFile.getPath().length()) { 279 raiseCannotInitiateUpdate("No data input specified"); 280 return false; 281 } 282 283 closeDataReader(); 284 285 try { 286 dataReader = new DataStreamReader(new FileInputStream(dataFile)); 287 } catch(FileNotFoundException e) { 288 raiseCannotInitiateUpdate("File not found: " + dataFile.getName()); 289 LiveGraph.application().guiManager().logErrorLn(e.getMessage()); 290 return false; 291 } catch (Exception e) { 292 raiseCannotInitiateUpdate(e.getMessage()); 293 LiveGraph.application().guiManager().logErrorLn(e.getMessage()); 294 return false; 295 } 296 297 return true; 298 } 299 300 /** 301 * Raises a {@code UPDIN_InitiateUpdate}-event to notify the {@code DataStreamToCacheReader} that 302 * it is time for a data update. 303 */ 304 public synchronized void requestUpdate() { 305 306 lastUpdateTime = System.currentTimeMillis(); 307 308 // Reopen data reader: 309 if (null == dataReader || LiveGraph.application().getDataFileSettings().getDoNotCacheData()) { 310 311 if (!openDataFileReader()) 312 return; 313 } 314 315 try { 316 synchronized (dataReader) { 317 318 if (null == dataReader) 319 return; 320 321 Event<DataUpdateEvent> event = new Event<DataUpdateEvent>( 322 this, DataUpdateEvent.class, 323 DataUpdateEvent.UPDIN_InitiateUpdate, 324 LiveGraph.application().getDataFileSettings().getDoNotCacheData(), 325 0L, Double.NaN, 326 dataReader); 327 LiveGraph.application().eventManager().raiseEvent(event); 328 } 329 } catch(NullPointerException e) { 330 // This will happen when dataReader became null after the check or when it could not be created. 331 // It's ok, just return. 332 } 333 } 334 335 /** 336 * Send the this invoker to sleep for {@code timeTickLength} milliseconds. 337 * When it wakes it, internal time state is updated an the observers notified. 338 */ 339 private void timeTick() { 340 341 synchronized(this) { 342 343 try { 344 if (interval < 0) 345 this.wait(); 346 else 347 this.wait(timeTickLength); 348 } catch(InterruptedException e) { } 349 350 sinceUpdateTime = System.currentTimeMillis() - lastUpdateTime; 351 remainingMillis = interval <= 0 ? -1 : Math.max(0, interval - sinceUpdateTime); 352 } 353 354 raiseTimerTick(); 355 } 356 357 /** 358 * Raises an event to notify listeners that this invoker has waken up to process events. 359 * This gives listeners displaying various information about this invoker a chance to 360 * update their state. 361 */ 362 private void raiseTimerTick() { 363 Event<DataUpdateEvent> event = new Event<DataUpdateEvent>(this, DataUpdateEvent.class, 364 DataUpdateEvent.UPDIN_TimerTick); 365 LiveGraph.application().eventManager().raiseEvent(event); 366 } 367 368 369 /** 370 * Winds up the operations by closing the current data reader. 371 */ 372 private void tidyUp() { 373 closeDataReader(); 374 } 375 376 /** 377 * Main invoker loop: 378 * call {@link #timeTick()}; 379 * if it is time for the next update, call {@link #requestUpdate()}; 380 * call {@link #timeTick()} again and continue the loop until {@link #mustQuit} is set to true; 381 * call {@link #tidyUp()} before quitting. 382 */ 383 public void run() { 384 385 while (true) { 386 387 try { 388 synchronized (this) { 389 if (mustQuit) { 390 tidyUp(); 391 return; 392 } 393 } 394 395 timeTick(); 396 397 if (sinceUpdateTime >= interval && interval > 0) 398 requestUpdate(); 399 } catch(Throwable e) { 400 LiveGraph.application().guiManager().logErrorLn(ThrowableTools.stackTraceToString(e)); 401 } 402 } 403 } 404 405 /** 406 * Uses a pipe buffer of 5 MB. To customise the buffer size, create your own streams and use 407 * {@link #startMemoryStreamMode(InputStream)} to initiate the memory stream mode. 408 * @return xxx 409 */ 410 public DataStreamWriter startMemoryStreamMode() { 411 412 try { 413 PipedInputStream ins = new PipedInputStream(5 * 1024 * 1024); // 5 MB 414 DataStreamWriter outw = DataStreamWriterFactory.createDataWriter(ins); 415 if (!startMemoryStreamMode(ins)) 416 return null; 417 return outw; 418 } catch(IOException e) { 419 return null; 420 } 421 422 } 423 424 public boolean startMemoryStreamMode(InputStream in) throws IOException { 425 426 // Check not null 427 if (null == in) 428 throw new NullPointerException("Cannot use a null stream for memory stream mode"); 429 430 // Check stream class: 431 if (in instanceof java.io.PipedInputStream) { 432 final String correctClassName = org.LiveGraph.dataFile.read.PipedInputStream.class.getCanonicalName(); 433 throw new IllegalArgumentException( 434 String.format("Streams of type java.io.PipedInputStream are incompatible with the memory" 435 + " stream mode. %nUse %1$s instead. %n" 436 + "For further info see the API reference for %1$s.", 437 correctClassName)); 438 } 439 440 // Check stream open: 441 in.available(); 442 443 // Validate switching to memory stream mode: 444 EventManager eventManager = LiveGraph.application().eventManager(); 445 Event<DataUpdateEvent> startEvent = new Event<DataUpdateEvent>(this, DataUpdateEvent.class, 446 DataUpdateEvent.UPDIN_StartMemoryStreamMode); 447 try { 448 if (!eventManager.validateEvent(startEvent)) 449 return false; 450 } catch(EventProcessingException e) { 451 return false; 452 } 453 454 // Memory stream is only possible in caching mode: 455 DataFileSettings dfs = LiveGraph.application().getDataFileSettings(); 456 dfs.setDoNotCacheData(false); 457 if (dfs.getDoNotCacheData()) 458 return false; 459 460 // Make sure all previous events are processed: 461 LiveGraph.application().eventManager().waitForEvents(); 462 463 synchronized (this) { 464 465 // Close old reader and open memory stream reader: 466 closeDataReader(); 467 resetCache(null); 468 dataReader = new DataStreamReader(in); 469 memoryStreamMode = true; 470 eventManager.raiseEvent(startEvent); 471 return true; 472 } 473 } 474 475 public synchronized boolean endMemoryStreamMode() { 476 477 // Validate switching: 478 Event<DataUpdateEvent> endEvent = new Event<DataUpdateEvent>(this, DataUpdateEvent.class, 479 DataUpdateEvent.UPDIN_EndMemoryStreamMode); 480 try { 481 if (!LiveGraph.application().eventManager().validateEvent(endEvent)) 482 return false; 483 } catch(EventProcessingException e) { 484 return false; 485 } 486 487 // Close the memory stream reader: 488 closeDataReader(); 489 memoryStreamMode = false; 490 491 // Set the data file and cache mode according to the current settings: 492 setDataFile(LiveGraph.application().getDataFileSettings().getDataFile()); 493 494 LiveGraph.application().eventManager().raiseEvent(endEvent); 495 return true; 496 } 497 498 /** 499 * Permits to register as listener with the main LiveGraph event manager and 500 * only with the main LiveGraph event manager. 501 * 502 * @param manager The {@code EventManager} for the registering attempt. 503 * @return {@code (LiveGraph.application().eventManager() == manager)}. 504 * @see EventListener#permissionRegisterWithEventManager(EventManager) 505 */ 506 public boolean permissionRegisterWithEventManager(EventManager manager) { 507 return LiveGraph.application().eventManager() == manager; 508 } 509 510 /** 511 * Does not permit any unregistering. 512 * 513 * @param manager The {@code EventManager} for the registering attempt. 514 * @return {@code false}. 515 * @see EventListener#permissionUnregisterWithEventManager(EventManager) 516 */ 517 public boolean permissionUnregisterWithEventManager(EventManager manager) { 518 return false; 519 } 520 521 /** 522 * Does nothing. 523 * 524 * @param manager The {@code EventManager} with which this {@code EventListener} is now registered. 525 * @see EventListener#completedRegisterWithEventManager(EventManager) 526 */ 527 public void completedRegisterWithEventManager(EventManager manager) { } 528 529 /** 530 * Does nothing. 531 * 532 * @param manager The {@code EventManager} with which this {@code EventListener} is now unregistered. 533 * @see EventListener#completedUnregisterWithEventManager(EventManager) 534 */ 535 public void completedUnregisterWithEventManager(EventManager manager) { } 536 537 /** 538 * Does nothing. 539 * 540 * @param event An event in which this {@code EventListener} may be interested. 541 * @return {@code false}. 542 * @see EventListener#checkEventInterest(Event) 543 */ 544 public boolean checkEventInterest(Event<? extends EventType> event) { 545 return false; 546 } 547 548 /** 549 * Validates (or not) settings event. When in "memory stream mode" this invoker will 550 * not valudate changing of cache mode (all-data/tail-only or cache/dont-cache) or changing 551 * the data file. 552 * 553 * @param event The event to be validated. 554 * @param soFar Whether {@code event} has been successfuly validated by whichever {@code EventListener}s 555 * (if any) were invoked to validate {@code event} before this {@code EventListener}. 556 * 557 * @return {@code false} if this invoker is in {@code memoryStreamMode} and if the event is of the types 558 * {@code [DFS_DataFile, DFS_DoNotCacheData, DFS_ShowOnlyTailData, DFS_Load]}. 559 * 560 * @see EventListener#checkEventValid(Event, boolean) 561 */ 562 public boolean checkEventValid(Event<? extends EventType> event, boolean soFar) { 563 564 if (!memoryStreamMode) 565 return true; 566 567 EventType eventType = event.getType(); 568 if (SettingsEvent.DFS_DataFile == eventType 569 || SettingsEvent.DFS_DoNotCacheData == eventType 570 || SettingsEvent.DFS_ShowOnlyTailData == eventType 571 || SettingsEvent.DFS_Load == eventType) { 572 573 synchronized(this) { 574 if (memoryStreamMode) 575 return false; 576 } 577 } 578 579 return true; 580 } 581 582 /** 583 * Processes events. 584 * 585 * @param event Event to process. 586 */ 587 public void eventRaised(Event<? extends EventType> event) { 588 589 if (null == event) 590 return; 591 592 if (event.getDomain() == SettingsEvent.class) { 593 processSettingsEvent(event.cast(SettingsEvent.class)); 594 } 595 } 596 597 /** 598 * When the application's settings change, this method is called in order 599 * to update the internal state accordingly. 600 * 601 * @param event Describes the change event. 602 */ 603 private void processSettingsEvent(Event<SettingsEvent> event) { 604 605 switch(event.getType()) { 606 607 case DFS_DataFile: 608 synchronized(this) { 609 if (memoryStreamMode) break; 610 setDataFile((String) event.getInfoObject()); 611 resetCache(null); 612 requestUpdate(); 613 } 614 break; 615 616 case DFS_UpdateFrequency: 617 setInterval(event.getInfoLong()); 618 break; 619 620 case DFS_DoNotCacheData: 621 synchronized(this) { 622 if (memoryStreamMode) break; 623 closeDataReader(); 624 if (!event.getInfoBoolean()) 625 resetCache(null); 626 } 627 break; 628 629 case DFS_ShowOnlyTailData: 630 synchronized(this) { 631 if (memoryStreamMode) break; 632 DataCache.CacheMode newMode = (event.getInfoBoolean() 633 ? DataCache.CacheMode.CacheTailData 634 : DataCache.CacheMode.CacheAllData); 635 DataCache.CacheMode oldMode = (dataCache.getCacheMode()); 636 637 if (newMode != oldMode) { 638 639 closeDataReader(); 640 resetCache(newMode); 641 requestUpdate(); 642 } 643 } 644 break; 645 646 case DFS_Load: 647 synchronized(this) { 648 if (memoryStreamMode) break; 649 DataFileSettings dfs = LiveGraph.application().getDataFileSettings(); 650 setInterval(dfs.getUpdateFrequency()); 651 setDataFile(dfs.getDataFile()); 652 resetCache(dfs.getShowOnlyTailData() 653 ? DataCache.CacheMode.CacheTailData 654 : DataCache.CacheMode.CacheAllData); 655 requestUpdate(); 656 } 657 break; 658 659 default: 660 break; 661 } 662 } 663 664 /** 665 * Objects of this class do not handle {@code eventProcessingFinished} notifications. 666 * 667 * @param event Ignored. 668 */ 669 public void eventProcessingFinished(Event<? extends EventType> event) { } 670 671 /** 672 * Objects of this class do not handle {@code eventProcessingException} notofications. 673 * 674 * @param event Ignored. 675 * @param exception Never actually thrown. 676 * @return {@code false}. 677 */ 678 public boolean eventProcessingException(Event<? extends EventType> event, EventProcessingException exception) { 679 return false; 680 } 681 682 } // public class UpdateInvoker