main.go 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "log"
  8. "math"
  9. "net"
  10. "os"
  11. "os/exec"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "time"
  16. // "github.com/go-echarts/go-echarts/v2/charts"
  17. // "github.com/go-echarts/go-echarts/v2/opts"
  18. // "github.com/go-echarts/go-echarts/v2/types"
  19. "github.com/go-echarts/go-echarts/v2/charts"
  20. "github.com/go-echarts/go-echarts/v2/opts"
  21. "github.com/go-echarts/go-echarts/v2/types"
  22. "github.com/gomodule/redigo/redis"
  23. )
  24. var channelSize int
  25. var connID int
  26. type Rec struct {
  27. Megred_Timestamp []string
  28. Merged_Hashtags []string
  29. Megred_User_Id string
  30. Megred_Ip string
  31. Cover_Msg string
  32. }
  33. // users in arrival phase; when from arrival to learning, agent need to send their arrival messages to agent
  34. var arrival_phase_start_list []int
  35. var arrival_phase_number_list []int //each batch new users' number
  36. var arrival_phase_number_list_lock sync.Mutex
  37. //each batch's length
  38. var average_arrival_length []int
  39. var Status sync.Map //each user' status
  40. var NameList []string //users' uid
  41. var NameList_Append_Lock sync.Mutex //
  42. var Batch_List []string //users in eahc batch
  43. var Batch_List_Append_Lock sync.Mutex
  44. var Batch_list_threshold int //batch threshold
  45. //used for receive message from clients
  46. var putInChannel chan string
  47. var receive_wg sync.WaitGroup
  48. var peocessConn_wg sync.WaitGroup
  49. var uid_channel chan string
  50. var uip_channel chan string
  51. //db14, used as user queue in arrival phase
  52. var db14_agentqueue_key chan string
  53. var db14_agentqueue_val chan string
  54. var db14_agentqueue_Lock sync.Mutex
  55. //db13,used to store users' status during learning phase
  56. var db13_learning_key chan string
  57. var db13_learning_val chan int //0-cover message;1-real message
  58. var db13_learning_Lock sync.Mutex
  59. //db12,used as user queue in online phase
  60. var db12_online_key chan string
  61. var db12_online_val chan string
  62. var db12_online_Lock sync.Mutex
  63. var online_msg_number int
  64. var online_msg_number_empty int
  65. var online_msg_in_each_group_in_each_round map[int][]int
  66. var group_size_in_each_round map[int][]int
  67. var online_msg_in_each_group_in_each_round_middle_wg sync.Mutex
  68. var online_msg_in_each_group_in_each_round_middle []int
  69. //from uid to group id
  70. var user_group_hashmap map[string]int
  71. //used in arrival to learning phase
  72. var sendDBToReceiver chan string
  73. var cleanDBToReceiver_wg sync.WaitGroup
  74. var sendDBToReceiver_lock sync.Mutex
  75. //count cover and real's number
  76. var cov_rec int
  77. var real_rec int
  78. var cov_lock sync.Mutex
  79. var real_lock sync.Mutex
  80. var packetLength int
  81. var groupSet_Number int
  82. var number_group_hashmap map[int]*Group
  83. type Group struct {
  84. Group_Id int
  85. Group_User_List []string
  86. Group_Size int
  87. Group_Size_copy int
  88. Group_Active_Threshold int
  89. Group_active bool
  90. Group_low_frequence bool
  91. Group_start_round int
  92. Group_online_sum_length int
  93. Group_delay_duration_sum time.Duration
  94. Group_outlier_flag bool
  95. Group_send_times int
  96. Group_sleep_times int
  97. }
  98. var group_list []*Group
  99. //function used to receive messages
  100. func receive(c net.Conn) { //
  101. defer receive_wg.Done()
  102. for {
  103. a, ok_return := <-putInChannel
  104. if !ok_return {
  105. return
  106. }
  107. mid := []byte(a)
  108. length := 0
  109. if a == "" {
  110. continue
  111. }
  112. for i := len(a) - 1; ; i-- {
  113. if i < 0 {
  114. log.Println("a=", a, "len(a)=", len(a), "len(mid)=", len(mid), "mid=", mid)
  115. }
  116. if mid[i] == 125 { //125=}
  117. length = i
  118. break
  119. }
  120. }
  121. mid = mid[:length+1]
  122. var stru map[string]interface{}
  123. err_unmarshal := json.Unmarshal(mid, &stru)
  124. if err_unmarshal != nil {
  125. log.Println("json err,err_unmarshal=", err_unmarshal, "len(mid=)", len(mid), "len(a)=", len(a))
  126. }
  127. var structure Rec
  128. //1,time
  129. times, err_time := redis.Strings(stru["Megred_Timestamp"], nil)
  130. if err_time != nil {
  131. log.Println("err_time", err_time, "a=", a)
  132. }
  133. structure.Megred_Timestamp = times
  134. //2,msg
  135. msgs, err_msgs := redis.Strings(stru["Megred_Hashtags"], nil)
  136. if err_msgs != nil {
  137. log.Println("err_msgs", err_msgs, "符串a", a)
  138. }
  139. structure.Merged_Hashtags = msgs
  140. //3,uid
  141. uid, err_uid := redis.String(stru["Megred_User_Id"], nil)
  142. if err_uid != nil {
  143. log.Println("err_uid", err_uid, "a", a)
  144. }
  145. structure.Megred_User_Id = uid
  146. //4,ip
  147. ip, err_ip := redis.String(stru["Megred_Ip"], nil)
  148. if err_ip != nil {
  149. log.Println("err_ip", err_ip, "a", a)
  150. }
  151. structure.Megred_Ip = ip
  152. //5.cover
  153. cov, err_cover := redis.String(stru["Cover_Msg"], nil)
  154. if err_cover != nil {
  155. log.Println("err_cover", err_cover)
  156. break
  157. }
  158. structure.Cover_Msg = cov
  159. //process users according to their status
  160. //1,new user,store it in db14
  161. staa, ok := Status.Load(structure.Megred_User_Id)
  162. if !ok {
  163. if structure.Megred_User_Id == "" {
  164. continue
  165. }
  166. Status.Store(structure.Megred_User_Id, 0)
  167. NameList_Append_Lock.Lock()
  168. NameList = append(NameList, structure.Megred_User_Id)
  169. NameList_Append_Lock.Unlock()
  170. Batch_List_Append_Lock.Lock()
  171. Batch_List = append(Batch_List, structure.Megred_User_Id)
  172. Batch_List_Append_Lock.Unlock()
  173. db14_agentqueue_Lock.Lock()
  174. db14_agentqueue_key <- structure.Megred_User_Id
  175. db14_agentqueue_val <- a
  176. db14_agentqueue_Lock.Unlock()
  177. arrival_phase_start_list = append(arrival_phase_start_list, round)
  178. continue
  179. } else {
  180. //a,arrival phase
  181. if staa == 0 {
  182. db14_agentqueue_Lock.Lock()
  183. db14_agentqueue_key <- structure.Megred_User_Id
  184. db14_agentqueue_val <- a
  185. db14_agentqueue_Lock.Unlock()
  186. continue
  187. }
  188. //b, learning phase
  189. if staa == 1 {
  190. //(1),cover message
  191. if structure.Cover_Msg == "yes" {
  192. db13_learning_Lock.Lock()
  193. db13_learning_key <- structure.Megred_User_Id
  194. db13_learning_val <- 0
  195. db13_learning_Lock.Unlock()
  196. cov_lock.Lock()
  197. cov_rec++
  198. cov_lock.Unlock()
  199. } else { //real
  200. db13_learning_Lock.Lock()
  201. db13_learning_key <- structure.Megred_User_Id
  202. db13_learning_val <- 1
  203. db13_learning_Lock.Unlock()
  204. real_lock.Lock()
  205. real_rec++
  206. real_lock.Unlock()
  207. }
  208. //(2),directly send it to recipient
  209. c.Write(mid)
  210. continue
  211. }
  212. //c,online
  213. if staa == 2 {
  214. db12_online_Lock.Lock()
  215. db12_online_key <- structure.Megred_User_Id
  216. db12_online_val <- a
  217. online_msg_number++
  218. db12_online_Lock.Unlock()
  219. online_msg_in_each_group_in_each_round_middle_wg.Lock()
  220. gid := user_group_hashmap[structure.Megred_User_Id]
  221. online_msg_in_each_group_in_each_round_middle[gid]++
  222. online_msg_in_each_group_in_each_round_middle_wg.Unlock()
  223. continue
  224. }
  225. }
  226. }
  227. }
  228. //put message into db14
  229. var db14_lpush_wg sync.WaitGroup
  230. var db14_lpush_lock sync.Mutex
  231. func db14_lpush() {
  232. defer db14_lpush_wg.Done()
  233. arrival_msg_num := 0
  234. log.Println("db14_rpush()")
  235. c_db14, _ := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379))
  236. c_db14.Do("select", 14)
  237. for {
  238. db14_lpush_lock.Lock()
  239. x, ok1 := <-db14_agentqueue_key
  240. y, ok2 := <-db14_agentqueue_val
  241. db14_lpush_lock.Unlock()
  242. if ok1 && ok2 {
  243. c_db14.Do("lpush", x, y)
  244. arrival_msg_num++
  245. }
  246. if (!ok1 || !ok2) && db13_db14_db12_waitForReceive_flag {
  247. log.Println("db14_lpush() ends,arrival_msg_num=", arrival_msg_num)
  248. return
  249. }
  250. }
  251. }
  252. //db13
  253. var db13_rpush_wg sync.WaitGroup
  254. var db13_rpush_lock sync.Mutex
  255. func db13_rpush() {
  256. defer db13_rpush_wg.Done()
  257. log.Println("db13_rpush()")
  258. real := 0
  259. cov := 0
  260. c_db13, _ := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379))
  261. c_db13.Do("select", 13)
  262. for {
  263. db13_rpush_lock.Lock()
  264. x, ok1 := <-db13_learning_key
  265. y, ok2 := <-db13_learning_val
  266. db13_rpush_lock.Unlock()
  267. if ok1 && ok2 {
  268. if y == 0 {
  269. cov = cov + 1
  270. } else {
  271. real = real + 1
  272. }
  273. c_db13.Do("rpush", x, y)
  274. }
  275. if db13_db14_db12_waitForReceive_flag && (!ok1 || !ok2) {
  276. log.Println("db13_rpush()ends,real,cov are", real, cov, "total number=", real+cov)
  277. return
  278. }
  279. }
  280. }
  281. //db12
  282. var db12_lpush_wg sync.WaitGroup
  283. var db12_lpush_lock sync.Mutex
  284. var test_list []string
  285. func db12_lpush() {
  286. defer db12_lpush_wg.Done()
  287. online_msg := 0
  288. log.Println("db12_lpush()starts")
  289. c_db12, _ := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379))
  290. c_db12.Do("select", 12)
  291. for {
  292. db12_lpush_lock.Lock()
  293. x, ok1 := <-db12_online_key
  294. test_list = append(test_list, x)
  295. y, ok2 := <-db12_online_val
  296. db12_lpush_lock.Unlock()
  297. if ok1 && ok2 {
  298. c_db12.Do("lpush", x, y)
  299. online_msg++
  300. }
  301. if db13_db14_db12_waitForReceive_flag && (!ok1 || !ok2) {
  302. log.Println("db12_lpush() ends,online_msg", online_msg)
  303. return
  304. }
  305. }
  306. }
  307. //init db12,13,14
  308. func initDb() {
  309. log.Println("initDb()执行完成,已经清空db13,db14")
  310. c, c_err := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379))
  311. if c_err != nil {
  312. log.Println("连接数据库出错,c_err=", c_err)
  313. }
  314. c.Do("select", 14)
  315. c.Do("Flushdb")
  316. c.Do("select", 13)
  317. c.Do("Flushdb")
  318. c.Do("select", 12)
  319. c.Do("Flushdb")
  320. }
  321. //remake channels and some slices
  322. func initChannelAndListAndMap() {
  323. log.Println("initChannelAndListAndMap")
  324. putInChannel = make(chan string, channelSize)
  325. db14_agentqueue_key = make(chan string, channelSize)
  326. db14_agentqueue_val = make(chan string, channelSize)
  327. db13_learning_key = make(chan string, channelSize)
  328. db13_learning_val = make(chan int, channelSize)
  329. db12_online_key = make(chan string, channelSize)
  330. db12_online_val = make(chan string, channelSize)
  331. }
  332. var db13_db14_db12_waitForReceive_flag bool
  333. //process all receive work
  334. func peocessConn(conn net.Conn, connID int) {
  335. db13_db14_db12_waitForReceive_flag = false
  336. log.Println("peocessConn starts")
  337. initChannelAndListAndMap()
  338. db14_lpush_wg.Add(1)
  339. go db14_lpush()
  340. db13_rpush_wg.Add(1)
  341. go db13_rpush()
  342. db12_lpush_wg.Add(1)
  343. go db12_lpush()
  344. //connect the recipients
  345. c, c_err := net.Dial("tcp", "192.168.32.144:20001") //recipient address
  346. if c_err != nil {
  347. log.Println("err,dial 192.168.32.144:20001 failed", c_err)
  348. }
  349. for i := 0; i < 500; i++ {
  350. receive_wg.Add(1)
  351. go receive(c)
  352. }
  353. var tmp [3000]byte
  354. index := 0
  355. total_msg_number := 0
  356. for {
  357. total_msg_number++
  358. _, err := conn.Read(tmp[:])
  359. if err != nil {
  360. log.Println("read from conn failed", err)
  361. return
  362. }
  363. if string(tmp[:8]) == "roundEnd" {
  364. log.Println("client ends the conn, total_msg_number", total_msg_number)
  365. break
  366. }
  367. //Solve the sticky bag problem
  368. if tmp[0] != 123 {
  369. var corrext_tmp []byte
  370. var corrext_follow []byte
  371. var correct_length int
  372. for i, v := range tmp[:] {
  373. if v == 123 {
  374. correct_length = i
  375. corrext_follow = make([]byte, correct_length)
  376. _, err_follow := conn.Read(corrext_follow[:])
  377. if err_follow != nil {
  378. log.Println("read from conn failed,err_follow=", err_follow)
  379. return
  380. }
  381. corrext_tmp = append(tmp[i:], corrext_follow[:]...)
  382. if len(corrext_tmp) != packetLength {
  383. log.Println("corrext_tmp length!=2000", len(corrext_tmp))
  384. }
  385. break
  386. }
  387. }
  388. putInChannel <- string(corrext_tmp[:])
  389. index++
  390. log.Println("total_msg_number=", total_msg_number)
  391. continue
  392. }
  393. putInChannel <- string(tmp[:])
  394. }
  395. close(putInChannel)
  396. receive_wg.Wait()
  397. //Determine if each anonymity set is online
  398. if len(group_list) > 0 {
  399. for _, v := range group_list {
  400. online_msg_in_each_group_in_each_round[v.Group_Id] = append(online_msg_in_each_group_in_each_round[v.Group_Id], online_msg_in_each_group_in_each_round_middle[v.Group_Id])
  401. number := float64(0)
  402. if v.Group_outlier_flag {
  403. number = float64(group_list[v.Group_Id].Group_Size*group_Threshold_outlier) / float64(100)
  404. } else {
  405. number = float64(group_list[v.Group_Id].Group_Size*group_list[v.Group_Id].Group_Active_Threshold) / float64(100)
  406. }
  407. if number >= float64(online_msg_in_each_group_in_each_round_middle[v.Group_Id]) && group_list[v.Group_Id].Group_sleep_times <= Group_sleep_times_threshold {
  408. group_list[v.Group_Id].Group_active = false
  409. group_list[v.Group_Id].Group_sleep_times++
  410. } else {
  411. group_list[v.Group_Id].Group_active = true
  412. group_list[v.Group_Id].Group_sleep_times = 0
  413. }
  414. }
  415. }
  416. db13_db14_db12_waitForReceive_flag = true
  417. close(db14_agentqueue_key)
  418. close(db14_agentqueue_val)
  419. close(db13_learning_key)
  420. close(db13_learning_val)
  421. close(db12_online_key)
  422. close(db12_online_val)
  423. db13_rpush_wg.Wait()
  424. db14_lpush_wg.Wait()
  425. db12_lpush_wg.Wait()
  426. log.Println("peocessConn ends, NameList length=", len(NameList), "Batch_Listlength =", len(Batch_List))
  427. }
  428. //send all arrival messages to recipients
  429. func cleanDBToReceiver(httpConn net.Conn) {
  430. defer cleanDBToReceiver_wg.Done()
  431. time.Sleep(time.Millisecond * 300)
  432. c, _ := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379))
  433. c.Do("select", 14)
  434. for {
  435. x, ok := <-sendDBToReceiver
  436. if ok {
  437. sendDBToReceiver_lock.Lock()
  438. queueLen, _ := c.Do("llen", x)
  439. ql, err_ql := redis.Int(queueLen, nil)
  440. if err_ql != nil {
  441. log.Println("db14,err_ql=", err_ql)
  442. }
  443. for i := 0; i < ql; i++ {
  444. result, err_rpop := c.Do("rpop", x)
  445. if err_rpop != nil {
  446. log.Println("db14,err_rpop=", err_rpop)
  447. }
  448. res, err_res := redis.String(result, nil)
  449. if err_res != nil {
  450. log.Println("db14,err_res=", err_res)
  451. }
  452. jsons, err_jsons := json.Marshal(res)
  453. if err_jsons != nil {
  454. log.Println("db14, err_jsons=", err_jsons)
  455. }
  456. httpConn.Write(jsons)
  457. }
  458. sendDBToReceiver_lock.Unlock()
  459. } else {
  460. return
  461. }
  462. }
  463. }
  464. var learningphase_start_round int
  465. var round int
  466. var last_Batch_List []string
  467. var last_Batch_List_to_online []string
  468. var lastLearningPhase_end_flag bool
  469. /*
  470. 1,users in batch_list,set their status to 1,means they enter the learning phase
  471. 2,clean db14中,send arrival message to recipients
  472. 3,tell clients,which users have changed their status
  473. 4,clean up batch-list
  474. 5,
  475. */
  476. func arrivalToLearn1() bool {
  477. log.Println("arrivalToLearn1()")
  478. time.Sleep(time.Millisecond * 300)
  479. //enough new users
  480. if len(Batch_List) >= Batch_list_threshold {
  481. if !lastLearningPhase_end_flag {
  482. log.Println("last learning phase still not ends, must wait")
  483. httpConn_client, err1 := net.Dial("tcp", "192.168.32.144:20002")
  484. if err1 != nil {
  485. log.Println("err,dial 192.168.32.144:20002 failed", err1)
  486. }
  487. sendToCli := []byte("noChange")
  488. httpConn_client.Write(sendToCli)
  489. log.Println("arrivalToLearn1() ends, no one changes status")
  490. return false
  491. }
  492. //they can enter learning phase
  493. log.Println("before enter learning phase, last learningphase_start_round=", learningphase_start_round, "round=", round)
  494. learningphase_start_round = round
  495. log.Println("after enter learning phase, new learningphase_start_round=", learningphase_start_round, "round=", round)
  496. last_Batch_List = make([]string, len(Batch_List))
  497. copy(last_Batch_List, Batch_List)
  498. log.Println("len(Batch_List)=", len(Batch_List))
  499. //A:send arrival messages to recipients
  500. //a1,conn the recipient
  501. httpConn_receiver, err2 := net.Dial("tcp", "192.168.32.144:20001") //recipients address
  502. if err2 != nil {
  503. log.Println("receiver,dial 192.168.32.144:20001 failed", err2)
  504. }
  505. //a2,init sendDBToReceiver' channel to store uid in batch list
  506. sendDBToReceiver = make(chan string, channelSize)
  507. //a3,clean ip db14
  508. for i := 0; i < 100; i++ {
  509. cleanDBToReceiver_wg.Add(1)
  510. go cleanDBToReceiver(httpConn_receiver)
  511. }
  512. middle_sum_number := 0
  513. for _, v := range arrival_phase_start_list {
  514. middle_sum_number = middle_sum_number + round - v
  515. }
  516. arrival_phase_number_list_lock.Lock()
  517. average_arrival_length = append(average_arrival_length, middle_sum_number/len(arrival_phase_start_list))
  518. arrival_phase_number_list = append(arrival_phase_number_list, len(arrival_phase_start_list))
  519. arrival_phase_number_list_lock.Unlock()
  520. arrival_phase_start_list = make([]int, 0)
  521. //B,sned feedback to clients
  522. httpConn_client, err1 := net.Dial("tcp", "192.168.32.144:20002") //client's address
  523. if err1 != nil {
  524. log.Println("err dial 192.168.32.144:20002 failed", err1)
  525. }
  526. sendToClient := ""
  527. for i, v := range Batch_List {
  528. sendDBToReceiver <- v
  529. Status.Store(v, 1)
  530. sendToClient = sendToClient + v + ","
  531. if (i%6000 == 0 && i != 0) || i == len(Batch_List)-1 {
  532. sendToCli := []byte(sendToClient)
  533. httpConn_client.Write(sendToCli)
  534. sendToClient = ""
  535. }
  536. }
  537. log.Println("Batch_List length", len(Batch_List))
  538. close(sendDBToReceiver)
  539. time.Sleep(time.Second * 2)
  540. httpConn_client.Write([]byte("changeEnd"))
  541. //C,clean Batch-list
  542. Batch_List = make([]string, 0)
  543. cleanDBToReceiver_wg.Wait()
  544. c, c_err := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379))
  545. if c_err != nil {
  546. log.Println("err,c_err=", c_err)
  547. }
  548. c.Do("select", 14)
  549. c.Do("flushdb")
  550. log.Println("len(Batch_List)=", len(Batch_List))
  551. log.Println("arrivalToLearn1() ends, someone has changed his status")
  552. lastLearningPhase_end_flag = false
  553. return true
  554. } else { //cannot enter learning phase
  555. httpConn_client, err1 := net.Dial("tcp", "192.168.32.144:20002")
  556. if err1 != nil {
  557. log.Println("err,dial 192.168.32.144:20002 failed", err1)
  558. }
  559. sendToCli := []byte("noChange")
  560. httpConn_client.Write(sendToCli)
  561. log.Println("arrivalToLearn1() ends,no one has changed his status")
  562. return false
  563. }
  564. }
  565. //determines the length of the learnin phase; could be 24/48/72 rounds
  566. func refresh_learningPhase_flag() bool {
  567. defer log.Println("refresh_learningPhase_flag ends")
  568. log.Println("refresh_learningPhase_flag starts")
  569. if round-learningphase_start_round == 24 { //24//48//72
  570. log.Println("round=", round, "learningphase_start_round=", learningphase_start_round)
  571. lastLearningPhase_end_flag = true
  572. //In the test only the first batch of users are tested, subsequent batches of users are not allowed to enter the learning phase
  573. if round >= 33 { //32//46//80//82,72 rounds
  574. lastLearningPhase_end_flag = false
  575. return false
  576. }
  577. last_Batch_List_to_online = make([]string, len(last_Batch_List))
  578. copy(last_Batch_List_to_online, last_Batch_List)
  579. return true
  580. }
  581. return false
  582. }
  583. //from learning to online
  584. func learning_To_online_batchList(flag bool) {
  585. //a,arrival to learning doesn't happen
  586. log.Println("learning_To_online starts,flag=", flag)
  587. time.Sleep(time.Second * 1)
  588. if !flag {
  589. httpConn_client, err1 := net.Dial("tcp", "192.168.32.144:20002")
  590. if err1 != nil {
  591. log.Println("err, dial 192.168.32.144:20002 failed", err1)
  592. }
  593. sendToCli := []byte("noChange")
  594. httpConn_client.Write(sendToCli)
  595. log.Println("learning_To_online,no one from learning to online")
  596. return
  597. } else { //b,it happends
  598. httpConn_client, err1 := net.Dial("tcp", "192.168.32.144:20002") //client's address
  599. if err1 != nil {
  600. log.Println("err dial 192.168.32.144:20002 failed", err1)
  601. }
  602. sendToClient := ""
  603. for i, v := range last_Batch_List_to_online {
  604. //1.status set to 2
  605. Status.Store(v, 2)
  606. sendToClient = sendToClient + v + ","
  607. //2,tell clients, which users have been online
  608. if (i%6000 == 0 && i != 0) || i == len(last_Batch_List_to_online)-1 {
  609. sendToCli := []byte(sendToClient)
  610. httpConn_client.Write(sendToCli)
  611. sendToClient = ""
  612. }
  613. }
  614. time.Sleep(time.Second * 1)
  615. httpConn_client.Write([]byte("changeEnd"))
  616. }
  617. }
  618. //learning, do clustering, create anonynity set
  619. func learning_function(flag bool) {
  620. defer log.Println("learning_function ends")
  621. log.Println("start learning_function")
  622. if flag {
  623. log.Println("round=", round, "learningphase_start_round=", learningphase_start_round)
  624. /*
  625. 1,taken data from db13, use output.sh,clean db13
  626. 2,do clustering, store result ingroupresult
  627. 3,create anonymity set and record the users in each set
  628. */
  629. //1,output.sh;
  630. // /home/it/middle_data/rediskey.txt val value.csv
  631. _, err := exec.Command("/bin/bash", "/home/it/middle_data/output.sh").CombinedOutput()
  632. if err != nil {
  633. log.Println("err,cmd.Output:", err)
  634. return
  635. }
  636. log.Println("output.sh ends")
  637. log.Println("do clustering")
  638. starttime := time.Now()
  639. cmd, err_cmd := exec.Command("python3", "/home/it/learning-phase/final.py").CombinedOutput()
  640. if err_cmd != nil {
  641. log.Println("err,err_cmd=", err_cmd)
  642. return
  643. }
  644. log.Println("result", string(cmd))
  645. // count time duration
  646. cost := int(time.Since(starttime) / time.Second)
  647. duration = cost
  648. //2,combine.sh
  649. _, err_combine := exec.Command("/bin/bash", "/home/it/middle_data/combine.sh").CombinedOutput()
  650. if err_combine != nil {
  651. log.Println("err,cmd.Output:", err_combine)
  652. return
  653. }
  654. log.Println("combine.sh ends")
  655. filePath1 := "/home/it/middle_data/filtered_good_name_list.txt"
  656. content1, err1 := ioutil.ReadFile(filePath1)
  657. if err1 != nil {
  658. panic(err1)
  659. } //m1 is key,uid
  660. m1 := strings.Split(string(content1), "\n")
  661. filePath2 := "/home/it/middle_data/groupingResult.txt"
  662. content2, err2 := ioutil.ReadFile(filePath2)
  663. if err2 != nil {
  664. panic(err1)
  665. } //m2 is the clustering result,
  666. m2 := strings.Split(string(content2), "\n")
  667. log.Println("m1 length=", len(m1), "m2 length=", len(m2))
  668. //create anonynmity set
  669. for i := 0; i < k_val; i++ {
  670. var g Group
  671. g.Group_Id = i + k_val*groupSet_Number
  672. g.Group_Active_Threshold = group_Threshold_global
  673. g.Group_Size = 0
  674. g.Group_Size_copy = 0
  675. g.Group_active = false
  676. g.Group_User_List = make([]string, 0)
  677. g.Group_start_round = round
  678. g.Group_online_sum_length = 0
  679. g.Group_delay_duration_sum = time.Duration(0)
  680. g.Group_send_times = 0
  681. g.Group_sleep_times = 0
  682. //low frequence users
  683. if i == k_val-1 {
  684. g.Group_low_frequence = true
  685. } else {
  686. g.Group_low_frequence = false
  687. }
  688. //outliers
  689. if i == k_val-2 {
  690. g.Group_outlier_flag = true
  691. } else {
  692. g.Group_outlier_flag = false
  693. }
  694. online_msg_in_each_group_in_each_round[g.Group_Id] = make([]int, 0)
  695. group_size_in_each_round[g.Group_Id] = make([]int, 0)
  696. group_list = append(group_list, &g)
  697. number_group_hashmap[i+k_val*groupSet_Number] = &g
  698. }
  699. for i := 0; i < len(m1)-1; i++ {
  700. if i < len(m2)-2 {
  701. intm2, err_m2 := strconv.Atoi(m2[i])
  702. if err_m2 != nil {
  703. log.Println("err_m2=", err_m2)
  704. return
  705. }
  706. user_group_hashmap[m1[i]] = intm2 + groupSet_Number*k_val
  707. middle_g, ok_middle_g := number_group_hashmap[intm2+groupSet_Number*k_val]
  708. if !ok_middle_g {
  709. log.Println("err,group id=", intm2+groupSet_Number*k_val)
  710. } else {
  711. middle_g.Group_Size++
  712. middle_g.Group_User_List = append(middle_g.Group_User_List, m1[i])
  713. }
  714. } else {
  715. user_group_hashmap[m1[i]] = (groupSet_Number+1)*k_val - 1
  716. middle_g, ok_middle_g := number_group_hashmap[(groupSet_Number+1)*k_val-1]
  717. if !ok_middle_g {
  718. log.Println("err,group id=", (groupSet_Number+1)*k_val-1)
  719. return
  720. } else {
  721. middle_g.Group_Size++
  722. middle_g.Group_User_List = append(middle_g.Group_User_List, m1[i])
  723. }
  724. }
  725. }
  726. sum := 0
  727. for i := 0; i < k_val; i++ {
  728. gg := number_group_hashmap[i+k_val*groupSet_Number]
  729. log.Println("id=", gg.Group_Id, "i+ k_val*groupSet_Number=", i+k_val*groupSet_Number, "group,size=", gg.Group_Size)
  730. sum = sum + gg.Group_Size
  731. }
  732. log.Println("groupSet_Number=", groupSet_Number)
  733. groupSet_Number++
  734. log.Println("update groupSet_Number,groupSet_Number=", groupSet_Number)
  735. return
  736. } else {
  737. return
  738. }
  739. }
  740. var group_active_function_db12_lock sync.Mutex
  741. var group_active_function_db12_rpop_lock sync.Mutex
  742. var eliminated_user_channel chan string
  743. var active_user_channel chan string
  744. var k_val int
  745. var group_Threshold_global int
  746. var group_Threshold_outlier int
  747. var delay_limitation int
  748. var outlier_delay_limitation int
  749. var Group_sleep_times_threshold int
  750. //process each group in each round, if it is active,eliminate the offline users
  751. func group_active_function(gg *Group, list_1 []string, list_2 []string) {
  752. defer log.Println("group_id=", gg.Group_Id, "group_active_function ends,Group_Size=", gg.Group_Size)
  753. log.Println("group_id=", gg.Group_Id, "group_active_function ends, Group_Size", gg.Group_Size)
  754. c_active, err_active := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379))
  755. if err_active != nil {
  756. log.Println("err,err_active=", err_active)
  757. return
  758. }
  759. //1 offline users
  760. gg.Group_online_sum_length = gg.Group_online_sum_length + len(list_2)*(round-gg.Group_start_round)
  761. gg.Group_Size_copy = gg.Group_Size_copy + len(list_2)
  762. for _, v := range list_2 {
  763. x, load_ok := Status.Load(v)
  764. if load_ok {
  765. if x != 2 {
  766. err_result = append(err_result, v)
  767. }
  768. }
  769. Status.Delete(v)
  770. eliminated_user_channel <- v
  771. c_active.Do("select", 12)
  772. group_active_function_db12_lock.Lock()
  773. _, err_del := c_active.Do("del", v)
  774. group_active_function_db12_lock.Unlock()
  775. if err_del != nil {
  776. log.Println("err_del=", err_del)
  777. return
  778. }
  779. }
  780. //2 online users
  781. gg.Group_Size = len(list_1)
  782. gg.Group_User_List = list_1
  783. c_active.Do("select", 12)
  784. for _, v := range list_1 {
  785. x, load_ok := Status.Load(v)
  786. if load_ok {
  787. if x != 2 {
  788. err_result = append(err_result, v)
  789. }
  790. }
  791. group_active_function_db12_rpop_lock.Lock()
  792. result, err_rpop := c_active.Do("rpop", v)
  793. group_active_function_db12_rpop_lock.Unlock()
  794. if err_rpop != nil {
  795. log.Println("err_rpop=", err_rpop)
  796. }
  797. res, err_string := redis.String(result, nil)
  798. if err_string != nil {
  799. log.Println("err_string=", err_string)
  800. }
  801. active_user_channel <- res
  802. mid := []byte(res)
  803. length := 0
  804. for i := len(res) - 1; ; i-- {
  805. if i < 0 {
  806. log.Println("err,res=", res, "len(res)=", len(res), "len(mid)=", len(mid), "mid=", mid)
  807. }
  808. if mid[i] == 125 { //125=}
  809. length = i
  810. break
  811. }
  812. }
  813. mid = mid[:length+1]
  814. gg.Group_send_times = gg.Group_send_times + 1
  815. var stru map[string]interface{}
  816. err_unmarshal := json.Unmarshal(mid, &stru)
  817. if err_unmarshal != nil {
  818. log.Println("err_unmarshal=", err_unmarshal, "len(mid=)", len(mid))
  819. }
  820. //time
  821. times, err_time := redis.Strings(stru["Megred_Timestamp"], nil)
  822. if err_time != nil {
  823. log.Println("err_time", err_time, "res=", res)
  824. }
  825. msg_time := times[0]
  826. loc, err_loc := time.LoadLocation("Local")
  827. if err_loc != nil {
  828. fmt.Println("err_loc=", err_loc)
  829. }
  830. //msg time
  831. the_time_of_msg, err_the_time_msg := time.ParseInLocation("20060102150405", msg_time, loc)
  832. if err_the_time_msg != nil {
  833. fmt.Println("err_the_time_msg=", err_the_time_msg)
  834. }
  835. the_time_of_agent, err_the_time_agent := time.ParseInLocation("20060102150405", "20121101120000", loc)
  836. if err_the_time_agent != nil {
  837. fmt.Println("err_the_time_agent=", err_the_time_agent)
  838. }
  839. send_out_timepoint := the_time_of_agent.Add(time.Duration(round+1) * time.Hour)
  840. duration := send_out_timepoint.Sub(the_time_of_msg)
  841. gg.Group_delay_duration_sum = gg.Group_delay_duration_sum + duration
  842. log.Println("group id=", gg.Group_Id, "the_time_of_msg=", the_time_of_msg, "send_out_timepoint=", send_out_timepoint, "duration=", duration)
  843. }
  844. }
  845. var each_group_process_lock sync.Mutex
  846. var each_group_process_wg sync.WaitGroup
  847. func each_group_process(g *Group) {
  848. defer each_group_process_wg.Done()
  849. c_db12, err_db12 := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379))
  850. if err_db12 != nil {
  851. log.Println("err_db12=", err_db12)
  852. return
  853. }
  854. c_db12.Do("select", 12)
  855. middle_active_list := make([]string, 0)
  856. middle_inactive_list := make([]string, 0)
  857. if g.Group_Size == 1 {
  858. middle_inactive_list = append(middle_inactive_list, g.Group_User_List[0])
  859. } else {
  860. for _, v := range g.Group_User_List {
  861. each_group_process_lock.Lock()
  862. agent_queue_length, err_agent_queue_length := c_db12.Do("llen", v)
  863. each_group_process_lock.Unlock()
  864. if err_agent_queue_length != nil {
  865. log.Println("err_agent_queue_length=", err_agent_queue_length)
  866. return
  867. }
  868. length, err_length := redis.Int(agent_queue_length, nil)
  869. if err_length != nil {
  870. log.Println("err_length=", err_length)
  871. return
  872. }
  873. if g.Group_outlier_flag { //outliers
  874. if length == 0 || length > outlier_delay_limitation {
  875. middle_inactive_list = append(middle_inactive_list, v)
  876. } else {
  877. middle_active_list = append(middle_active_list, v)
  878. }
  879. } else { //normal users
  880. if length == 0 || length > delay_limitation {
  881. middle_inactive_list = append(middle_inactive_list, v)
  882. } else {
  883. middle_active_list = append(middle_active_list, v)
  884. }
  885. }
  886. }
  887. }
  888. number := float64(0)
  889. //for outliers
  890. if g.Group_outlier_flag == true {
  891. number = float64(g.Group_Size*g.Group_Active_Threshold) / float64(100)
  892. }
  893. if g.Group_active {
  894. log.Println("group_id=", g.Group_Id, "threshold number=", number, "active user number", len(middle_active_list))
  895. group_active_function(g, middle_active_list, middle_inactive_list)
  896. } else {
  897. log.Println("group_id=", g.Group_Id, "threshold number=", number, "active user number", len(middle_active_list))
  898. }
  899. log.Println("group_id=", g.Group_Id, "is", g.Group_active, "active user number=", len(middle_active_list), "inactive user number=", len(middle_inactive_list), "group_size=", g.Group_Size, "Group_User_List length=", len(g.Group_User_List))
  900. }
  901. var db12_send_to_receiver_wg sync.WaitGroup
  902. var active_user_channel_closed bool
  903. //send online message to recipient
  904. func db12_send_to_receiver(c net.Conn) {
  905. defer db12_send_to_receiver_wg.Done()
  906. for {
  907. x, ok := <-active_user_channel
  908. if ok {
  909. jsons, err_jsons := json.Marshal(x)
  910. if err_jsons != nil {
  911. log.Println("db12 err_jsons=", err_jsons)
  912. }
  913. c.Write(jsons)
  914. } else {
  915. if active_user_channel_closed {
  916. return
  917. } else {
  918. continue
  919. }
  920. }
  921. }
  922. }
  923. var eliminated_user_channel_closed bool
  924. var online_eliminated_batchList_wg sync.WaitGroup
  925. //send the eliminated users namelist to client
  926. func online_eliminated_batchList() {
  927. defer online_eliminated_batchList_wg.Done()
  928. log.Println("online_eliminated_batchList starts")
  929. time.Sleep(time.Second * 2)
  930. httpConn_client, err1 := net.Dial("tcp", "192.168.32.144:20002") //client address
  931. if err1 != nil {
  932. log.Println("conn client err, dial 192.168.32.144:20002 failed", err1)
  933. }
  934. number := 0
  935. eliminated_list := make([]string, 0)
  936. for {
  937. v, ok := <-eliminated_user_channel
  938. if ok {
  939. number++
  940. eliminated_list = append(eliminated_list, v)
  941. } else {
  942. if eliminated_user_channel_closed {
  943. break
  944. } else {
  945. continue
  946. }
  947. }
  948. }
  949. //someone is eliminated
  950. if len(eliminated_list) > 0 {
  951. sendToClient := ""
  952. for i, v := range eliminated_list {
  953. sendToClient = sendToClient + v + ","
  954. if (i%6000 == 0 && i != 0) || i == len(eliminated_list)-1 {
  955. sendToCli := []byte(sendToClient)
  956. httpConn_client.Write(sendToCli)
  957. sendToClient = ""
  958. }
  959. }
  960. time.Sleep(time.Second * 2)
  961. httpConn_client.Write([]byte("changeEnd"))
  962. } else {
  963. httpConn_client.Write([]byte("noChange"))
  964. }
  965. log.Println("eliminated user number=", number)
  966. }
  967. //send online message to recipient
  968. func online_cache_process() {
  969. c, c_err := net.Dial("tcp", "192.168.32.144:20001") //recipient
  970. log.Println("start online_cache_process")
  971. if c_err != nil {
  972. log.Println("online msg db12 err, dial 192.168.32.144:20001 failed", c_err)
  973. }
  974. active_user_channel_closed = false
  975. eliminated_user_channel_closed = false
  976. for i := 0; i < 10; i++ {
  977. db12_send_to_receiver_wg.Add(1)
  978. go db12_send_to_receiver(c)
  979. }
  980. online_eliminated_batchList_wg.Add(1)
  981. go online_eliminated_batchList()
  982. log.Println("group_list length", len(group_list))
  983. for _, v := range group_list {
  984. each_group_process_wg.Add(1)
  985. each_group_process(v)
  986. }
  987. each_group_process_wg.Wait()
  988. close(active_user_channel)
  989. active_user_channel_closed = true
  990. close(eliminated_user_channel)
  991. eliminated_user_channel_closed = true
  992. db12_send_to_receiver_wg.Wait()
  993. online_eliminated_batchList_wg.Wait()
  994. }
  995. func record_group_size() {
  996. for _, v := range group_list {
  997. group_size_in_each_round[v.Group_Id] = append(group_size_in_each_round[v.Group_Id], v.Group_Size)
  998. }
  999. }
  1000. func initMetric() {
  1001. log.Println("initMetric() starts")
  1002. Batch_list_threshold = 10000
  1003. channelSize = Batch_list_threshold + 10000
  1004. round = 0
  1005. lastLearningPhase_end_flag = true
  1006. packetLength = 3000
  1007. k_val = 14
  1008. group_Threshold_global = 30
  1009. group_Threshold_outlier = 30
  1010. delay_limitation = 2
  1011. outlier_delay_limitation = 2
  1012. Group_sleep_times_threshold = 5
  1013. }
  1014. //init some channels and parameters
  1015. func initNameListAndBatchList() {
  1016. log.Println("initNameListAndBatchList() starts")
  1017. Batch_List = make([]string, 0)
  1018. NameList = make([]string, 0)
  1019. number_group_hashmap = make(map[int]*Group)
  1020. groupSet_Number = 0
  1021. group_list = make([]*Group, 0)
  1022. online_msg_in_each_group_in_each_round = make(map[int][]int)
  1023. user_group_hashmap = make(map[string]int)
  1024. group_size_in_each_round = make(map[int][]int)
  1025. err_result = make([]string, 0)
  1026. arrival_phase_start_list = make([]int, 0)
  1027. average_arrival_length = make([]int, 0)
  1028. arrival_phase_number_list = make([]int, 0)
  1029. }
  1030. func generateLineItems(result []int) []opts.LineData {
  1031. items := make([]opts.LineData, 0)
  1032. for i := 0; i < len(result); i++ {
  1033. items = append(items, opts.LineData{Value: result[i]})
  1034. }
  1035. return items
  1036. }
  1037. var err_result []string
  1038. var duration int
  1039. func main() {
  1040. //log
  1041. logFile, err := os.OpenFile("log.txt", os.O_CREATE|os.O_APPEND|os.O_RDWR, 0666)
  1042. if err != nil {
  1043. panic(err)
  1044. }
  1045. defer logFile.Close()
  1046. mw := io.MultiWriter(os.Stdout, logFile)
  1047. log.SetOutput(mw)
  1048. connID = 0
  1049. var connID int
  1050. //1,listen to client's request
  1051. listener, err := net.Listen("tcp", "192.168.32.144:20000")
  1052. if err != nil {
  1053. log.Println("start tcp server 192.168.32.144:20000 failed", err)
  1054. return
  1055. }
  1056. initNameListAndBatchList()
  1057. initMetric()
  1058. initDb()
  1059. for i := 0; i < 35; i++ {
  1060. log.Println("--------------------------------------------------the", i, "th round starts----------------------------------------------")
  1061. eliminated_user_channel = make(chan string, 100000)
  1062. active_user_channel = make(chan string, 100000)
  1063. online_msg_number = 0
  1064. online_msg_number_empty = 0
  1065. round = i
  1066. cov_rec = 0
  1067. real_rec = 0
  1068. conn, err := listener.Accept()
  1069. if err != nil {
  1070. log.Println("accept failed", err)
  1071. return
  1072. }
  1073. connID = connID + 1
  1074. log.Println("connID", connID)
  1075. peocessConn(conn, connID)
  1076. h24 := refresh_learningPhase_flag()
  1077. arrivalToLearn1()
  1078. learning_To_online_batchList(h24)
  1079. learning_function(h24)
  1080. online_msg_in_each_group_in_each_round_middle = make([]int, k_val*(groupSet_Number))
  1081. online_cache_process()
  1082. log.Println("online msg number=", online_msg_number, "online_msg_number_empty=", online_msg_number_empty)
  1083. record_group_size()
  1084. log.Println("NameList length", len(NameList))
  1085. log.Println("ov_rec,real_rec numbers ", cov_rec, real_rec)
  1086. log.Println("---------------------------------------------------next round---------------------------------------------------")
  1087. }
  1088. x_zhou := make([]int, len(group_list))
  1089. for i := 0; i < len(group_list); i++ {
  1090. x_zhou[i] = i
  1091. }
  1092. ave_groupsize := make([]int, len(group_size_in_each_round[0]))
  1093. for i, v := range group_list {
  1094. fmt.Println(i, v)
  1095. line := charts.NewLine()
  1096. line.SetGlobalOptions(
  1097. charts.WithInitializationOpts(opts.Initialization{Theme: types.ThemeWesteros}),
  1098. charts.WithTitleOpts(opts.Title{
  1099. Title: ("group_id=" + strconv.Itoa(v.Group_Id)),
  1100. Subtitle: "k_val=" + strconv.Itoa(k_val) + ";delay_limitation=" + strconv.Itoa(delay_limitation) + ";Group_Active_Threshold=" + strconv.Itoa(v.Group_Active_Threshold),
  1101. }))
  1102. log.Println("group_id=", i, "group,group_size changes as", group_size_in_each_round[v.Group_Id])
  1103. //average group size
  1104. if !v.Group_low_frequence {
  1105. for i, v := range group_size_in_each_round[v.Group_Id] {
  1106. if v == 0 {
  1107. ave_groupsize[i] = ave_groupsize[i] + 1
  1108. } else {
  1109. ave_groupsize[i] = ave_groupsize[i] + v
  1110. }
  1111. }
  1112. }
  1113. log.Println("group_id=", i, "group,in each round,active user numbers are", online_msg_in_each_group_in_each_round[v.Group_Id])
  1114. line.SetXAxis(x_zhou).
  1115. AddSeries("Category A", generateLineItems(online_msg_in_each_group_in_each_round[v.Group_Id])).
  1116. AddSeries("Category B", generateLineItems(group_size_in_each_round[v.Group_Id])).
  1117. SetSeriesOptions(charts.WithLineChartOpts(opts.LineChart{Step: true, ConnectNulls: true}))
  1118. name1 := "group_"
  1119. name2 := ".html"
  1120. name := name1 + strconv.Itoa(i) + name2
  1121. w, _ := os.Create(name)
  1122. line.Render(w)
  1123. }
  1124. log.Println(err_result)
  1125. log.Println("k_val=", k_val, "duration=", duration)
  1126. log.Println("arrival_phase_number, average_arrival_length", arrival_phase_number_list, average_arrival_length)
  1127. sum_number := 0
  1128. sum_length := 0
  1129. sum_duration := time.Duration(0)
  1130. sum_send_times := 0
  1131. for _, v := range group_list {
  1132. if !v.Group_low_frequence {
  1133. //average arrival length
  1134. log.Println(v.Group_Id, "Group_size_copy=", v.Group_Size_copy, "online sum length", v.Group_online_sum_length)
  1135. sum_length = v.Group_online_sum_length + sum_length
  1136. sum_number = sum_number + v.Group_Size_copy
  1137. //online msg' latency
  1138. sum_duration = sum_duration + v.Group_delay_duration_sum
  1139. sum_send_times = sum_send_times + v.Group_send_times
  1140. log.Println("group id = ", v.Group_Id, "group,online msg average latency=", v.Group_delay_duration_sum/time.Duration(v.Group_send_times), "Group_outlier_flag", v.Group_outlier_flag)
  1141. }
  1142. }
  1143. log.Println("average online length=", float64(sum_length)/float64(sum_number), "user sum_number", sum_number, "online phase total length", sum_length, "online average latency", sum_duration/time.Duration(sum_send_times))
  1144. ave_groupsize_double := make([]float64, len(ave_groupsize))
  1145. for i, v := range ave_groupsize {
  1146. ave_groupsize_double[i] = math.Round((float64(v) / float64(k_val-1)))
  1147. }
  1148. log.Println("average group size=", ave_groupsize_double)
  1149. log.Println("sum of group size=", ave_groupsize)
  1150. log.Println("delay_limitation=", delay_limitation, "outlier_delay_limitation", outlier_delay_limitation, "group_Threshold_global", group_Threshold_global, "Group_sleep_times_threshold", Group_sleep_times_threshold)
  1151. }