123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244 |
- using System;
- using System.Collections;
- using System.Collections.Generic;
- using System.Threading;
- using CSVReader;
- using Networkreader;
- using UnityEngine;
- using Logger;
- using System.Diagnostics;
- namespace Processor
- {
- public class Processor : IProcessor
- {
- #region Parameter
- /// <summary>
- /// Lockobject to lock binary reader pointer.
- /// </summary>
- private readonly object LockObject;
- /// <summary>
- /// The latest read timestamp.
- /// </summary>
- private double CurrentTimestamp { get; set; }
- /// <summary>
- /// the timestamp of the newest object when the buffer was externaly read.
- /// </summary>
- private double LastReadTime { get; set; }
- /// <summary>
- /// Buffers the read Objects.
- /// </summary>
- /// <remarks>
- /// I should contain all objects that satisfy: <see cref="LastReadTime"/> +- 1.0 / <see cref="SampleRate"/>.
- /// </remarks>
- private List<InputObject> Buffer { get; set; }
- /// <summary>
- /// The read direction.
- /// </summary>
- private bool Forward { get; set; }
- /// <summary>
- /// Handle on the AquisitionThread.
- /// </summary>
- private Thread AquisitionThreadHandle { get; set; }
- /// <summary>
- /// Set this to false to stop the <see cref="AquisitionThreadHandle"/>.
- /// </summary>
- private bool Streaming { get; set; }
- /// <summary>
- /// The samplerate that was used to write the file.
- /// </summary>
- private int SampleRate { get; set; }
- /// <summary>
- /// The Instance of the <see cref="ICSVReader"/> that interacts with the file.
- /// </summary>
- private ICSVReader ObjectReader;
- /// <summary>
- /// The Instance of the <see cref="INWReader"/> manage the tcp stream.
- /// </summary>
- private INWReader NWReader { get; set; }
- #endregion
- #region Constructor
- /// <summary>
- /// Use this constructor to read from a static file.
- /// </summary>
- /// <param name="filename">Name of the file that should be streamed from. Has to be located in .../Assets/CSVInput/ .</param>
- public Processor(string filename)
- {
- Buffer = new List<InputObject>();
- LockObject = new object();
- ObjectReader = new UnitCSVReader(filename);
- }
- /// <summary>
- /// Use this constructor to read from a Network stream.
- /// </summary>
- /// <param name="ipAdress">IPAdress of stream server.</param>
- /// <param name="port">Port of stream server.</param>
- public Processor(string ipAdress, int port)
- {
- Buffer = new List<InputObject>();
- LockObject = new object();
- NWReader = new NWreader(ipAdress, port, 250);
- string file = NWReader.StartNWRead();
- //wait for first data and create file.
- Thread.Sleep(1000);
- ObjectReader = new UnitCSVReader(file);
- }
- #endregion
- #region Exposed
- public bool StartStreaming(int samplerate, bool forward = true)
- {
- //Prepare for streaming
- SampleRate = samplerate;
- Forward = forward;
- CurrentTimestamp = 0;
- LastReadTime = 0;
- Streaming = true;
- lock (LockObject)
- Buffer.Clear();
- //start streaming
- AquisitionThreadHandle = new Thread(AquisitionThread);
- AquisitionThreadHandle.Start();
- return true;
- }
- public bool JumpToTimestamp(double timestamp, out double actualTimestamp)
- {
- if (!ObjectReader.IsInBounds(timestamp))
- {
- CityLogger.LogError("Starttime of: " + timestamp.ToString() + " was not in bounds, unable to jump to timestamp");
- actualTimestamp = -1;
- return false;
- }
- double realStartTime = ObjectReader.JumpToEntry(timestamp, Forward);
- CurrentTimestamp = realStartTime;
- LastReadTime = realStartTime;
- lock (LockObject)
- Buffer.Clear();
- actualTimestamp = realStartTime;
- return true;
- }
- public List<InputObject> ReadNextValues()
- {
- List<InputObject> mBuffer;
- lock (LockObject)
- {
- mBuffer = new List<InputObject>(Buffer);
- Buffer.Clear();
- }
- LastReadTime = ObjectReader.Peek(Forward);
- return mBuffer;
- }
- public void ReverseTime()
- {
- lock (LockObject)
- Buffer.Clear();
- Forward = !Forward;
- }
- public void ChangeSamplerate(int newSamplerate)
- {
- SampleRate = newSamplerate;
- }
- public void StopStreaming()
- {
- Streaming = false;
- lock (LockObject)
- Buffer.Clear();
- if (NWReader != null)
- NWReader.StopReading();
- if (ObjectReader != null)
- ObjectReader.TearDown();
- if (AquisitionThreadHandle == null || !AquisitionThreadHandle.IsAlive)
- return;
- Stopwatch watch = new Stopwatch();
- watch.Start();
- //stall till thread is exited
- while (AquisitionThreadHandle.IsAlive)
- {
- if (watch.ElapsedMilliseconds > 5000)
- {
- watch.Stop();
- CityLogger.LogWarning("Waiting for closing of read thread ended in timeout");
- return;
- }
- }
- watch.Stop();
- }
- public double GetNewestTimeStamp()
- {
- if (NWReader != null)
- return NWReader.GetNewestTimestamp();
- else
- return ObjectReader.GetLastTimestamp();
- }
- public double GetOldestTimeStamp()
- {
- if (NWReader != null)
- return NWReader.GetOldestTimestamp();
- else
- return ObjectReader.GetFirstTimestamp();
- }
- public double GetChunkStarttimestamp(){
- return ObjectReader.GetChunkStarttimestamp();
- }
- public double GetChunkEndtimestamp(){
- return ObjectReader.GetChunkEndtimestamp();
- }
- public void SetChunkStarttimestamp(double time){
- ObjectReader.SetChunkStarttimestamp(time);
- }
- public void SetChunkEndtimestamp(double time){
- ObjectReader.SetChunkEndtimestamp(time);
- }
- #endregion
- #region Internal
- private void AquisitionThread()
- {
- while (Streaming)
- {
- try
- {
- double peekTimestamp = ObjectReader.Peek(Forward);
- if (Forward ? (!double.IsNaN(peekTimestamp) && peekTimestamp < LastReadTime + 1.0 / SampleRate) : !double.IsNaN(peekTimestamp) && peekTimestamp > LastReadTime - 1.0 / SampleRate)
- {
- InputObject obj = ObjectReader.ReadNextEntry(Forward);
- if (obj != null)
- {
- CurrentTimestamp = obj.Time;
- lock (LockObject)
- Buffer.Add(obj);
- }
- }
- }
- catch (Exception e)
- {
- CityLogger.LogWarning("Unable to buffer value from file: " + e.Message);
- }
- }
- }
- #endregion
- }
- }
|