Processor.cs 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. using System;
  2. using System.Collections;
  3. using System.Collections.Generic;
  4. using System.Threading;
  5. using CSVReader;
  6. using Networkreader;
  7. using UnityEngine;
  8. using Logger;
  9. using System.Diagnostics;
  10. namespace Processor
  11. {
  12. public class Processor : IProcessor
  13. {
  14. #region Parameter
  15. /// <summary>
  16. /// Lockobject to lock binary reader pointer.
  17. /// </summary>
  18. private readonly object LockObject;
  19. /// <summary>
  20. /// The latest read timestamp.
  21. /// </summary>
  22. private double CurrentTimestamp { get; set; }
  23. /// <summary>
  24. /// the timestamp of the newest object when the buffer was externaly read.
  25. /// </summary>
  26. private double LastReadTime { get; set; }
  27. /// <summary>
  28. /// Buffers the read Objects.
  29. /// </summary>
  30. /// <remarks>
  31. /// I should contain all objects that satisfy: <see cref="LastReadTime"/> +- 1.0 / <see cref="SampleRate"/>.
  32. /// </remarks>
  33. private List<InputObject> Buffer { get; set; }
  34. /// <summary>
  35. /// The read direction.
  36. /// </summary>
  37. private bool Forward { get; set; }
  38. /// <summary>
  39. /// Handle on the AquisitionThread.
  40. /// </summary>
  41. private Thread AquisitionThreadHandle { get; set; }
  42. /// <summary>
  43. /// Set this to false to stop the <see cref="AquisitionThreadHandle"/>.
  44. /// </summary>
  45. private bool Streaming { get; set; }
  46. /// <summary>
  47. /// The samplerate that was used to write the file.
  48. /// </summary>
  49. private int SampleRate { get; set; }
  50. /// <summary>
  51. /// The Instance of the <see cref="ICSVReader"/> that interacts with the file.
  52. /// </summary>
  53. private ICSVReader ObjectReader;
  54. /// <summary>
  55. /// The Instance of the <see cref="INWReader"/> manage the tcp stream.
  56. /// </summary>
  57. private INWReader NWReader { get; set; }
  58. #endregion
  59. #region Constructor
  60. /// <summary>
  61. /// Use this constructor to read from a static file.
  62. /// </summary>
  63. /// <param name="filename">Name of the file that should be streamed from. Has to be located in .../Assets/CSVInput/ .</param>
  64. public Processor(string filename)
  65. {
  66. Buffer = new List<InputObject>();
  67. LockObject = new object();
  68. ObjectReader = new UnitCSVReader(filename);
  69. }
  70. /// <summary>
  71. /// Use this constructor to read from a Network stream.
  72. /// </summary>
  73. /// <param name="ipAdress">IPAdress of stream server.</param>
  74. /// <param name="port">Port of stream server.</param>
  75. public Processor(string ipAdress, int port)
  76. {
  77. Buffer = new List<InputObject>();
  78. LockObject = new object();
  79. NWReader = new NWreader(ipAdress, port, 250);
  80. string file = NWReader.StartNWRead();
  81. //wait for first data and create file.
  82. Thread.Sleep(1000);
  83. ObjectReader = new UnitCSVReader(file);
  84. }
  85. #endregion
  86. #region Exposed
  87. public bool StartStreaming(int samplerate, bool forward = true)
  88. {
  89. //Prepare for streaming
  90. SampleRate = samplerate;
  91. Forward = forward;
  92. CurrentTimestamp = 0;
  93. LastReadTime = 0;
  94. Streaming = true;
  95. lock (LockObject)
  96. Buffer.Clear();
  97. //start streaming
  98. AquisitionThreadHandle = new Thread(AquisitionThread);
  99. AquisitionThreadHandle.Start();
  100. return true;
  101. }
  102. public bool JumpToTimestamp(double timestamp, out double actualTimestamp)
  103. {
  104. if (!ObjectReader.IsInBounds(timestamp))
  105. {
  106. CityLogger.LogError("Starttime of: " + timestamp.ToString() + " was not in bounds, unable to jump to timestamp");
  107. actualTimestamp = -1;
  108. return false;
  109. }
  110. double realStartTime = ObjectReader.JumpToEntry(timestamp, Forward);
  111. CurrentTimestamp = realStartTime;
  112. LastReadTime = realStartTime;
  113. lock (LockObject)
  114. Buffer.Clear();
  115. actualTimestamp = realStartTime;
  116. return true;
  117. }
  118. public List<InputObject> ReadNextValues()
  119. {
  120. List<InputObject> mBuffer;
  121. lock (LockObject)
  122. {
  123. mBuffer = new List<InputObject>(Buffer);
  124. Buffer.Clear();
  125. }
  126. LastReadTime = ObjectReader.Peek(Forward);
  127. return mBuffer;
  128. }
  129. public void ReverseTime()
  130. {
  131. lock (LockObject)
  132. Buffer.Clear();
  133. Forward = !Forward;
  134. }
  135. public void ChangeSamplerate(int newSamplerate)
  136. {
  137. SampleRate = newSamplerate;
  138. }
  139. public void StopStreaming()
  140. {
  141. Streaming = false;
  142. lock (LockObject)
  143. Buffer.Clear();
  144. if (NWReader != null)
  145. NWReader.StopReading();
  146. if (ObjectReader != null)
  147. ObjectReader.TearDown();
  148. if (AquisitionThreadHandle == null || !AquisitionThreadHandle.IsAlive)
  149. return;
  150. Stopwatch watch = new Stopwatch();
  151. watch.Start();
  152. //stall till thread is exited
  153. while (AquisitionThreadHandle.IsAlive)
  154. {
  155. if (watch.ElapsedMilliseconds > 5000)
  156. {
  157. watch.Stop();
  158. CityLogger.LogWarning("Waiting for closing of read thread ended in timeout");
  159. return;
  160. }
  161. }
  162. watch.Stop();
  163. }
  164. public double GetNewestTimeStamp()
  165. {
  166. if (NWReader != null)
  167. return NWReader.GetNewestTimestamp();
  168. else
  169. return ObjectReader.GetLastTimestamp();
  170. }
  171. public double GetOldestTimeStamp()
  172. {
  173. if (NWReader != null)
  174. return NWReader.GetOldestTimestamp();
  175. else
  176. return ObjectReader.GetFirstTimestamp();
  177. }
  178. public double GetChunkStarttimestamp(){
  179. return ObjectReader.GetChunkStarttimestamp();
  180. }
  181. public double GetChunkEndtimestamp(){
  182. return ObjectReader.GetChunkEndtimestamp();
  183. }
  184. public void SetChunkStarttimestamp(double time){
  185. ObjectReader.SetChunkStarttimestamp(time);
  186. }
  187. public void SetChunkEndtimestamp(double time){
  188. ObjectReader.SetChunkEndtimestamp(time);
  189. }
  190. #endregion
  191. #region Internal
  192. private void AquisitionThread()
  193. {
  194. while (Streaming)
  195. {
  196. try
  197. {
  198. double peekTimestamp = ObjectReader.Peek(Forward);
  199. if (Forward ? (!double.IsNaN(peekTimestamp) && peekTimestamp < LastReadTime + 1.0 / SampleRate) : !double.IsNaN(peekTimestamp) && peekTimestamp > LastReadTime - 1.0 / SampleRate)
  200. {
  201. InputObject obj = ObjectReader.ReadNextEntry(Forward);
  202. if (obj != null)
  203. {
  204. CurrentTimestamp = obj.Time;
  205. lock (LockObject)
  206. Buffer.Add(obj);
  207. }
  208. }
  209. }
  210. catch (Exception e)
  211. {
  212. CityLogger.LogWarning("Unable to buffer value from file: " + e.Message);
  213. }
  214. }
  215. }
  216. #endregion
  217. }
  218. }