using System;
using System.Collections;
using System.Collections.Generic;
using System.Threading;
using CSVReader;
using Networkreader;
using UnityEngine;
using Logger;
using System.Diagnostics;
namespace Processor
{
public class Processor : IProcessor
{
#region Parameter
///
/// Lockobject to lock binary reader pointer.
///
private readonly object LockObject;
///
/// The latest read timestamp.
///
private double CurrentTimestamp { get; set; }
///
/// the timestamp of the newest object when the buffer was externaly read.
///
private double LastReadTime { get; set; }
///
/// Buffers the read Objects.
///
///
/// I should contain all objects that satisfy: +- 1.0 / .
///
private List Buffer { get; set; }
///
/// The read direction.
///
private bool Forward { get; set; }
///
/// Handle on the AquisitionThread.
///
private Thread AquisitionThreadHandle { get; set; }
///
/// Set this to false to stop the .
///
private bool Streaming { get; set; }
///
/// The samplerate that was used to write the file.
///
private int SampleRate { get; set; }
///
/// The Instance of the that interacts with the file.
///
private ICSVReader ObjectReader;
///
/// The Instance of the manage the tcp stream.
///
private INWReader NWReader { get; set; }
#endregion
#region Constructor
///
/// Use this constructor to read from a static file.
///
/// Name of the file that should be streamed from. Has to be located in .../Assets/CSVInput/ .
public Processor(string filename)
{
Buffer = new List();
LockObject = new object();
ObjectReader = new UnitCSVReader(filename);
}
///
/// Use this constructor to read from a Network stream.
///
/// IPAdress of stream server.
/// Port of stream server.
public Processor(string ipAdress, int port)
{
Buffer = new List();
LockObject = new object();
NWReader = new NWreader(ipAdress, port, 250);
string file = NWReader.StartNWRead();
//wait for first data and create file.
Thread.Sleep(1000);
ObjectReader = new UnitCSVReader(file);
}
#endregion
#region Exposed
public bool StartStreaming(int samplerate, bool forward = true)
{
//Prepare for streaming
SampleRate = samplerate;
Forward = forward;
CurrentTimestamp = 0;
LastReadTime = 0;
Streaming = true;
lock (LockObject)
Buffer.Clear();
//start streaming
AquisitionThreadHandle = new Thread(AquisitionThread);
AquisitionThreadHandle.Start();
return true;
}
public bool JumpToTimestamp(double timestamp, out double actualTimestamp)
{
if (!ObjectReader.IsInBounds(timestamp))
{
CityLogger.LogError("Starttime of: " + timestamp.ToString() + " was not in bounds, unable to jump to timestamp");
actualTimestamp = -1;
return false;
}
double realStartTime = ObjectReader.JumpToEntry(timestamp, Forward);
CurrentTimestamp = realStartTime;
LastReadTime = realStartTime;
lock (LockObject)
Buffer.Clear();
actualTimestamp = realStartTime;
return true;
}
public List ReadNextValues()
{
List mBuffer;
lock (LockObject)
{
mBuffer = new List(Buffer);
Buffer.Clear();
}
LastReadTime = ObjectReader.Peek(Forward);
return mBuffer;
}
public void ReverseTime()
{
lock (LockObject)
Buffer.Clear();
Forward = !Forward;
}
public void ChangeSamplerate(int newSamplerate)
{
SampleRate = newSamplerate;
}
public void StopStreaming()
{
Streaming = false;
lock (LockObject)
Buffer.Clear();
if (NWReader != null)
NWReader.StopReading();
if (ObjectReader != null)
ObjectReader.TearDown();
if (AquisitionThreadHandle == null || !AquisitionThreadHandle.IsAlive)
return;
Stopwatch watch = new Stopwatch();
watch.Start();
//stall till thread is exited
while (AquisitionThreadHandle.IsAlive)
{
if (watch.ElapsedMilliseconds > 5000)
{
watch.Stop();
CityLogger.LogWarning("Waiting for closing of read thread ended in timeout");
return;
}
}
watch.Stop();
}
public double GetNewestTimeStamp()
{
if (NWReader != null)
return NWReader.GetNewestTimestamp();
else
return ObjectReader.GetLastTimestamp();
}
public double GetOldestTimeStamp()
{
if (NWReader != null)
return NWReader.GetOldestTimestamp();
else
return ObjectReader.GetFirstTimestamp();
}
public double GetChunkStarttimestamp(){
return ObjectReader.GetChunkStarttimestamp();
}
public double GetChunkEndtimestamp(){
return ObjectReader.GetChunkEndtimestamp();
}
public void SetChunkStarttimestamp(double time){
ObjectReader.SetChunkStarttimestamp(time);
}
public void SetChunkEndtimestamp(double time){
ObjectReader.SetChunkEndtimestamp(time);
}
#endregion
#region Internal
private void AquisitionThread()
{
while (Streaming)
{
try
{
double peekTimestamp = ObjectReader.Peek(Forward);
if (Forward ? (!double.IsNaN(peekTimestamp) && peekTimestamp < LastReadTime + 1.0 / SampleRate) : !double.IsNaN(peekTimestamp) && peekTimestamp > LastReadTime - 1.0 / SampleRate)
{
InputObject obj = ObjectReader.ReadNextEntry(Forward);
if (obj != null)
{
CurrentTimestamp = obj.Time;
lock (LockObject)
Buffer.Add(obj);
}
}
}
catch (Exception e)
{
CityLogger.LogWarning("Unable to buffer value from file: " + e.Message);
}
}
}
#endregion
}
}