using System; using System.Collections; using System.Collections.Generic; using System.Threading; using CSVReader; using Networkreader; using UnityEngine; using Logger; using System.Diagnostics; namespace Processor { public class Processor : IProcessor { #region Parameter /// /// Lockobject to lock binary reader pointer. /// private readonly object LockObject; /// /// The latest read timestamp. /// private double CurrentTimestamp { get; set; } /// /// the timestamp of the newest object when the buffer was externaly read. /// private double LastReadTime { get; set; } /// /// Buffers the read Objects. /// /// /// I should contain all objects that satisfy: +- 1.0 / . /// private List Buffer { get; set; } /// /// The read direction. /// private bool Forward { get; set; } /// /// Handle on the AquisitionThread. /// private Thread AquisitionThreadHandle { get; set; } /// /// Set this to false to stop the . /// private bool Streaming { get; set; } /// /// The samplerate that was used to write the file. /// private int SampleRate { get; set; } /// /// The Instance of the that interacts with the file. /// private ICSVReader ObjectReader; /// /// The Instance of the manage the tcp stream. /// private INWReader NWReader { get; set; } #endregion #region Constructor /// /// Use this constructor to read from a static file. /// /// Name of the file that should be streamed from. Has to be located in .../Assets/CSVInput/ . public Processor(string filename) { Buffer = new List(); LockObject = new object(); ObjectReader = new UnitCSVReader(filename); } /// /// Use this constructor to read from a Network stream. /// /// IPAdress of stream server. /// Port of stream server. public Processor(string ipAdress, int port) { Buffer = new List(); LockObject = new object(); NWReader = new NWreader(ipAdress, port, 250); string file = NWReader.StartNWRead(); //wait for first data and create file. Thread.Sleep(1000); ObjectReader = new UnitCSVReader(file); } #endregion #region Exposed public bool StartStreaming(int samplerate, bool forward = true) { //Prepare for streaming SampleRate = samplerate; Forward = forward; CurrentTimestamp = 0; LastReadTime = 0; Streaming = true; lock (LockObject) Buffer.Clear(); //start streaming AquisitionThreadHandle = new Thread(AquisitionThread); AquisitionThreadHandle.Start(); return true; } public bool JumpToTimestamp(double timestamp, out double actualTimestamp) { if (!ObjectReader.IsInBounds(timestamp)) { CityLogger.LogError("Starttime of: " + timestamp.ToString() + " was not in bounds, unable to jump to timestamp"); actualTimestamp = -1; return false; } double realStartTime = ObjectReader.JumpToEntry(timestamp, Forward); CurrentTimestamp = realStartTime; LastReadTime = realStartTime; lock (LockObject) Buffer.Clear(); actualTimestamp = realStartTime; return true; } public List ReadNextValues() { List mBuffer; lock (LockObject) { mBuffer = new List(Buffer); Buffer.Clear(); } LastReadTime = ObjectReader.Peek(Forward); return mBuffer; } public void ReverseTime() { lock (LockObject) Buffer.Clear(); Forward = !Forward; } public void ChangeSamplerate(int newSamplerate) { SampleRate = newSamplerate; } public void StopStreaming() { Streaming = false; lock (LockObject) Buffer.Clear(); if (NWReader != null) NWReader.StopReading(); if (ObjectReader != null) ObjectReader.TearDown(); if (AquisitionThreadHandle == null || !AquisitionThreadHandle.IsAlive) return; Stopwatch watch = new Stopwatch(); watch.Start(); //stall till thread is exited while (AquisitionThreadHandle.IsAlive) { if (watch.ElapsedMilliseconds > 5000) { watch.Stop(); CityLogger.LogWarning("Waiting for closing of read thread ended in timeout"); return; } } watch.Stop(); } public double GetNewestTimeStamp() { if (NWReader != null) return NWReader.GetNewestTimestamp(); else return ObjectReader.GetLastTimestamp(); } public double GetOldestTimeStamp() { if (NWReader != null) return NWReader.GetOldestTimestamp(); else return ObjectReader.GetFirstTimestamp(); } public double GetChunkStarttimestamp(){ return ObjectReader.GetChunkStarttimestamp(); } public double GetChunkEndtimestamp(){ return ObjectReader.GetChunkEndtimestamp(); } public void SetChunkStarttimestamp(double time){ ObjectReader.SetChunkStarttimestamp(time); } public void SetChunkEndtimestamp(double time){ ObjectReader.SetChunkEndtimestamp(time); } #endregion #region Internal private void AquisitionThread() { while (Streaming) { try { double peekTimestamp = ObjectReader.Peek(Forward); if (Forward ? (!double.IsNaN(peekTimestamp) && peekTimestamp < LastReadTime + 1.0 / SampleRate) : !double.IsNaN(peekTimestamp) && peekTimestamp > LastReadTime - 1.0 / SampleRate) { InputObject obj = ObjectReader.ReadNextEntry(Forward); if (obj != null) { CurrentTimestamp = obj.Time; lock (LockObject) Buffer.Add(obj); } } } catch (Exception e) { CityLogger.LogWarning("Unable to buffer value from file: " + e.Message); } } } #endregion } }