main.go 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "log"
  8. "math"
  9. "net"
  10. "os"
  11. "os/exec"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "time"
  16. // "github.com/go-echarts/go-echarts/v2/charts"
  17. // "github.com/go-echarts/go-echarts/v2/opts"
  18. // "github.com/go-echarts/go-echarts/v2/types"
  19. "github.com/go-echarts/go-echarts/v2/charts"
  20. "github.com/go-echarts/go-echarts/v2/opts"
  21. "github.com/go-echarts/go-echarts/v2/types"
  22. "github.com/gomodule/redigo/redis"
  23. )
  24. var channelSize int
  25. var connID int
  26. type Rec struct {
  27. Megred_Timestamp []string
  28. Merged_Hashtags []string
  29. Megred_User_Id string
  30. Megred_Ip string
  31. Cover_Msg string
  32. }
  33. var arrival_phase_start_list []int
  34. var arrival_phase_start_list_lock sync.Mutex
  35. var arrival_phase_number_list []int
  36. var arrival_phase_number_list_lock sync.Mutex
  37. var average_arrival_length []int
  38. var Status sync.Map
  39. var NameList []string
  40. var NameList_Append_Lock sync.Mutex
  41. var Batch_List []string
  42. var Batch_List_Append_Lock sync.Mutex
  43. var Batch_list_threshold int
  44. var putInChannel chan string
  45. var receive_wg sync.WaitGroup
  46. var peocessConn_wg sync.WaitGroup
  47. var uid_channel chan string
  48. var uip_channel chan string
  49. var db14_agentqueue_key chan string
  50. var db14_agentqueue_val chan string
  51. var db14_agentqueue_Lock sync.Mutex
  52. var db13_learning_key chan string
  53. var db13_learning_val chan int
  54. var db13_learning_Lock sync.Mutex
  55. var db12_online_key chan string
  56. var db12_online_val chan string
  57. var db12_online_Lock sync.Mutex
  58. var online_msg_number int
  59. var online_msg_number_empty int
  60. var online_msg_in_each_group_in_each_round map[int][]int
  61. var group_size_in_each_round map[int][]int
  62. var online_msg_in_each_group_in_each_round_middle_wg sync.Mutex
  63. var online_msg_in_each_group_in_each_round_middle []int
  64. var user_group_hashmap map[string]int
  65. var sendDBToReceiver chan string
  66. var cleanDBToReceiver_wg sync.WaitGroup
  67. var sendDBToReceiver_lock sync.Mutex
  68. var cov_rec int
  69. var real_rec int
  70. var cov_lock sync.Mutex
  71. var real_lock sync.Mutex
  72. var packetLength int
  73. var total_cover int
  74. var total_cover_lock sync.Mutex
  75. var groupSet_Number int
  76. var number_group_hashmap map[int]*Group
  77. type Group struct {
  78. Group_Id int
  79. Group_User_List []string
  80. Group_Size int
  81. Group_Size_copy int
  82. Group_Active_Threshold int
  83. Group_active bool
  84. Group_low_frequence bool
  85. Group_start_round int
  86. Group_online_sum_length int
  87. Group_delay_duration_sum time.Duration
  88. Group_outlier_flag bool
  89. Group_send_times int
  90. Group_sleep_times int
  91. Group_cover int
  92. Group_delay_in_each_round []time.Duration
  93. }
  94. var group_list []*Group
  95. func receive(c net.Conn) {
  96. defer receive_wg.Done()
  97. for {
  98. a, ok_return := <-putInChannel
  99. if !ok_return {
  100. return
  101. }
  102. mid := []byte(a)
  103. length := 0
  104. if a == "" {
  105. continue
  106. }
  107. for i := len(a) - 1; ; i-- {
  108. if i < 0 {
  109. log.Println("err,a=", a, "len(a)=", len(a), "len(mid)=", len(mid), "mid=", mid)
  110. }
  111. if mid[i] == 125 { //125=}
  112. length = i
  113. break
  114. }
  115. }
  116. mid = mid[:length+1]
  117. var stru map[string]interface{}
  118. err_unmarshal := json.Unmarshal(mid, &stru)
  119. if err_unmarshal != nil {
  120. log.Println("json,err_unmarshal=", err_unmarshal, "len(mid=)", len(mid), "len(a)=", len(a))
  121. }
  122. var structure Rec
  123. //1,time
  124. times, err_time := redis.Strings(stru["Megred_Timestamp"], nil)
  125. if err_time != nil {
  126. log.Println("err_time", err_time, "a=", a)
  127. }
  128. structure.Megred_Timestamp = times
  129. //2,msg
  130. msgs, err_msgs := redis.Strings(stru["Megred_Hashtags"], nil)
  131. if err_msgs != nil {
  132. log.Println("err_msgs", err_msgs, "a", a)
  133. }
  134. structure.Merged_Hashtags = msgs
  135. //3,uid
  136. uid, err_uid := redis.String(stru["Megred_User_Id"], nil)
  137. if err_uid != nil {
  138. log.Println("err_uid", err_uid, "a", a)
  139. }
  140. structure.Megred_User_Id = uid
  141. //4,ip
  142. ip, err_ip := redis.String(stru["Megred_Ip"], nil)
  143. if err_ip != nil {
  144. log.Println("err_ip", err_ip, "a", a)
  145. }
  146. structure.Megred_Ip = ip
  147. //5.cover
  148. cov, err_cover := redis.String(stru["Cover_Msg"], nil)
  149. if err_cover != nil {
  150. log.Println("err_cover", err_cover)
  151. break
  152. }
  153. structure.Cover_Msg = cov
  154. //different status,sifferent db(process workflow)
  155. //1,put into db14
  156. staa, ok := Status.Load(structure.Megred_User_Id)
  157. if !ok {
  158. if structure.Megred_User_Id == "" {
  159. continue
  160. }
  161. Status.Store(structure.Megred_User_Id, 0)
  162. NameList_Append_Lock.Lock()
  163. NameList = append(NameList, structure.Megred_User_Id)
  164. NameList_Append_Lock.Unlock()
  165. Batch_List_Append_Lock.Lock()
  166. Batch_List = append(Batch_List, structure.Megred_User_Id)
  167. Batch_List_Append_Lock.Unlock()
  168. db14_agentqueue_Lock.Lock()
  169. db14_agentqueue_key <- structure.Megred_User_Id
  170. db14_agentqueue_val <- a
  171. db14_agentqueue_Lock.Unlock()
  172. arrival_phase_start_list_lock.Lock()
  173. arrival_phase_start_list = append(arrival_phase_start_list, round)
  174. arrival_phase_start_list_lock.Unlock()
  175. continue
  176. } else {
  177. //a,arrival
  178. if staa == 0 {
  179. db14_agentqueue_Lock.Lock()
  180. db14_agentqueue_key <- structure.Megred_User_Id
  181. db14_agentqueue_val <- a
  182. db14_agentqueue_Lock.Unlock()
  183. continue
  184. }
  185. //b,learning
  186. if staa == 1 {
  187. if structure.Cover_Msg == "yes" {
  188. db13_learning_Lock.Lock()
  189. db13_learning_key <- structure.Megred_User_Id
  190. db13_learning_val <- 0
  191. db13_learning_Lock.Unlock()
  192. cov_lock.Lock()
  193. cov_rec++
  194. cov_lock.Unlock()
  195. } else {
  196. db13_learning_Lock.Lock()
  197. db13_learning_key <- structure.Megred_User_Id
  198. db13_learning_val <- 1
  199. db13_learning_Lock.Unlock()
  200. real_lock.Lock()
  201. real_rec++
  202. real_lock.Unlock()
  203. }
  204. c.Write(mid)
  205. continue
  206. }
  207. //c,online
  208. if staa == 2 {
  209. db12_online_Lock.Lock()
  210. db12_online_key <- structure.Megred_User_Id
  211. db12_online_val <- a
  212. online_msg_number++
  213. db12_online_Lock.Unlock()
  214. online_msg_in_each_group_in_each_round_middle_wg.Lock()
  215. gid := user_group_hashmap[structure.Megred_User_Id]
  216. online_msg_in_each_group_in_each_round_middle[gid]++
  217. online_msg_in_each_group_in_each_round_middle_wg.Unlock()
  218. if structure.Cover_Msg == "yes" {
  219. total_cover_lock.Lock()
  220. total_cover++
  221. total_cover_lock.Unlock()
  222. }
  223. continue
  224. }
  225. }
  226. }
  227. }
  228. //db14,same with v1
  229. var db14_lpush_wg sync.WaitGroup
  230. var db14_lpush_lock sync.Mutex
  231. func db14_lpush() {
  232. defer db14_lpush_wg.Done()
  233. arrival_msg_num := 0
  234. log.Println("db14_rpush() starts")
  235. c_db14, _ := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379))
  236. c_db14.Do("select", 14)
  237. for {
  238. db14_lpush_lock.Lock()
  239. x, ok1 := <-db14_agentqueue_key
  240. y, ok2 := <-db14_agentqueue_val
  241. db14_lpush_lock.Unlock()
  242. if ok1 && ok2 {
  243. c_db14.Do("lpush", x, y)
  244. arrival_msg_num++
  245. }
  246. if (!ok1 || !ok2) && db13_db14_db12_waitForReceive_flag {
  247. log.Println("db14_lpush() ends,arrival_msg_num=", arrival_msg_num)
  248. return
  249. }
  250. }
  251. }
  252. //db13,same with v1
  253. var db13_rpush_wg sync.WaitGroup
  254. var db13_rpush_lock sync.Mutex
  255. func db13_rpush() {
  256. defer db13_rpush_wg.Done()
  257. log.Println("db13_rpush() starts")
  258. real := 0
  259. cov := 0
  260. c_db13, _ := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379))
  261. c_db13.Do("select", 13)
  262. for {
  263. db13_rpush_lock.Lock()
  264. x, ok1 := <-db13_learning_key
  265. y, ok2 := <-db13_learning_val
  266. db13_rpush_lock.Unlock()
  267. if ok1 && ok2 {
  268. if y == 0 {
  269. cov = cov + 1
  270. } else {
  271. real = real + 1
  272. }
  273. c_db13.Do("rpush", x, y)
  274. }
  275. if db13_db14_db12_waitForReceive_flag && (!ok1 || !ok2) {
  276. log.Println("db13_rpush()ends, real,cov", real, cov, "total numbers", real+cov)
  277. return
  278. }
  279. }
  280. }
  281. //db12,same with v1
  282. var db12_lpush_wg sync.WaitGroup
  283. var db12_lpush_lock sync.Mutex
  284. var test_list []string
  285. func db12_lpush() {
  286. defer db12_lpush_wg.Done()
  287. online_msg := 0
  288. log.Println("db12_lpush()starts")
  289. c_db12, _ := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379))
  290. c_db12.Do("select", 12)
  291. for {
  292. db12_lpush_lock.Lock()
  293. x, ok1 := <-db12_online_key
  294. test_list = append(test_list, x)
  295. y, ok2 := <-db12_online_val
  296. db12_lpush_lock.Unlock()
  297. if ok1 && ok2 {
  298. c_db12.Do("lpush", x, y)
  299. online_msg++
  300. }
  301. if db13_db14_db12_waitForReceive_flag && (!ok1 || !ok2) {
  302. log.Println("db12_lpush() ends,online_msg", online_msg)
  303. return
  304. }
  305. }
  306. }
  307. //same with v1
  308. func initDb() {
  309. log.Println("initDb()")
  310. c, c_err := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379))
  311. if c_err != nil {
  312. log.Println("c_err=", c_err)
  313. }
  314. c.Do("select", 14)
  315. c.Do("Flushdb")
  316. c.Do("select", 13)
  317. c.Do("Flushdb")
  318. c.Do("select", 12)
  319. c.Do("Flushdb")
  320. }
  321. //same with v1
  322. func initChannelAndListAndMap() {
  323. log.Println("initChannelAndListAndMap starts")
  324. putInChannel = make(chan string, channelSize)
  325. db14_agentqueue_key = make(chan string, channelSize)
  326. db14_agentqueue_val = make(chan string, channelSize)
  327. db13_learning_key = make(chan string, channelSize)
  328. db13_learning_val = make(chan int, channelSize)
  329. db12_online_key = make(chan string, channelSize)
  330. db12_online_val = make(chan string, channelSize)
  331. }
  332. var db13_db14_db12_waitForReceive_flag bool
  333. //same with v1
  334. func peocessConn(conn net.Conn, connID int) {
  335. db13_db14_db12_waitForReceive_flag = false
  336. log.Println("peocessConn starts")
  337. initChannelAndListAndMap()
  338. db14_lpush_wg.Add(1)
  339. go db14_lpush()
  340. db13_rpush_wg.Add(1)
  341. go db13_rpush()
  342. db12_lpush_wg.Add(1)
  343. go db12_lpush()
  344. c, c_err := net.Dial("tcp", "192.168.32.144:20001") //recipient address
  345. if c_err != nil {
  346. log.Println("conn err,dial 192.168.32.144:20001 failed", c_err)
  347. }
  348. for i := 0; i < 500; i++ {
  349. receive_wg.Add(1)
  350. go receive(c)
  351. }
  352. var tmp [3000]byte
  353. index := 0
  354. total_msg_number := 0
  355. for {
  356. total_msg_number++
  357. _, err := conn.Read(tmp[:])
  358. if err != nil {
  359. log.Println("read from conn failed", err)
  360. return
  361. }
  362. if string(tmp[:8]) == "roundEnd" {
  363. log.Println("client ends the conn,msg=", total_msg_number)
  364. break
  365. }
  366. if tmp[0] != 123 {
  367. var corrext_tmp []byte
  368. var corrext_follow []byte
  369. var correct_length int
  370. for i, v := range tmp[:] {
  371. if v == 123 {
  372. correct_length = i
  373. corrext_follow = make([]byte, correct_length)
  374. _, err_follow := conn.Read(corrext_follow[:])
  375. if err_follow != nil {
  376. log.Println("read from conn failed,err_follow=", err_follow)
  377. return
  378. }
  379. corrext_tmp = append(tmp[i:], corrext_follow[:]...)
  380. if len(corrext_tmp) != packetLength {
  381. log.Println("err", len(corrext_tmp))
  382. }
  383. break
  384. }
  385. }
  386. putInChannel <- string(corrext_tmp[:])
  387. index++
  388. log.Println("total_msg_number=", total_msg_number)
  389. continue
  390. }
  391. putInChannel <- string(tmp[:])
  392. }
  393. close(putInChannel)
  394. receive_wg.Wait()
  395. //decide if each anonymity set is oniline
  396. if len(group_list) > 0 {
  397. for _, v := range group_list {
  398. online_msg_in_each_group_in_each_round[v.Group_Id] = append(online_msg_in_each_group_in_each_round[v.Group_Id], online_msg_in_each_group_in_each_round_middle[v.Group_Id])
  399. number := float64(0)
  400. if v.Group_outlier_flag {
  401. number = float64(group_list[v.Group_Id].Group_Size*group_Threshold_outlier) / float64(100)
  402. } else {
  403. number = float64(group_list[v.Group_Id].Group_Size*group_list[v.Group_Id].Group_Active_Threshold) / float64(100)
  404. }
  405. if number >= float64(online_msg_in_each_group_in_each_round_middle[v.Group_Id]) {
  406. group_list[v.Group_Id].Group_active = false
  407. group_list[v.Group_Id].Group_sleep_times++
  408. } else {
  409. group_list[v.Group_Id].Group_active = true
  410. group_list[v.Group_Id].Group_sleep_times = 0
  411. }
  412. }
  413. }
  414. db13_db14_db12_waitForReceive_flag = true
  415. close(db14_agentqueue_key)
  416. close(db14_agentqueue_val)
  417. close(db13_learning_key)
  418. close(db13_learning_val)
  419. close(db12_online_key)
  420. close(db12_online_val)
  421. db13_rpush_wg.Wait()
  422. db14_lpush_wg.Wait()
  423. db12_lpush_wg.Wait()
  424. log.Println("peocessConn ends,NameList length=", len(NameList), "Batch_List length", len(Batch_List))
  425. }
  426. //same with v1
  427. func cleanDBToReceiver(httpConn net.Conn) {
  428. defer cleanDBToReceiver_wg.Done()
  429. time.Sleep(time.Millisecond * 300)
  430. c, _ := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379))
  431. c.Do("select", 14)
  432. for {
  433. x, ok := <-sendDBToReceiver
  434. if ok {
  435. sendDBToReceiver_lock.Lock()
  436. queueLen, _ := c.Do("llen", x)
  437. ql, err_ql := redis.Int(queueLen, nil)
  438. if err_ql != nil {
  439. log.Println("db14 err_ql=", err_ql)
  440. }
  441. for i := 0; i < ql; i++ {
  442. result, err_rpop := c.Do("rpop", x)
  443. if err_rpop != nil {
  444. log.Println("db14 err_rpop=", err_rpop)
  445. }
  446. res, err_res := redis.String(result, nil)
  447. if err_res != nil {
  448. log.Println("db14 err_res=", err_res)
  449. }
  450. jsons, err_jsons := json.Marshal(res)
  451. if err_jsons != nil {
  452. log.Println("db14 err_jsons=", err_jsons)
  453. }
  454. httpConn.Write(jsons)
  455. }
  456. sendDBToReceiver_lock.Unlock()
  457. } else {
  458. return
  459. }
  460. }
  461. }
  462. var learningphase_start_round int
  463. var round int
  464. var last_Batch_List []string
  465. var last_Batch_List_to_online []string
  466. var lastLearningPhase_end_flag bool
  467. //same with v1
  468. func arrivalToLearn1() bool {
  469. log.Println("arrivalToLearn1() starts")
  470. time.Sleep(time.Millisecond * 300)
  471. if len(Batch_List) >= Batch_list_threshold {
  472. if !lastLearningPhase_end_flag {
  473. httpConn_client, err1 := net.Dial("tcp", "192.168.32.144:20002")
  474. if err1 != nil {
  475. log.Println("conn err dial 192.168.32.144:20002 failed", err1)
  476. }
  477. sendToCli := []byte("noChange")
  478. httpConn_client.Write(sendToCli)
  479. return false
  480. }
  481. learningphase_start_round = round
  482. last_Batch_List = make([]string, len(Batch_List))
  483. copy(last_Batch_List, Batch_List)
  484. httpConn_receiver, err2 := net.Dial("tcp", "192.168.32.144:20001")
  485. if err2 != nil {
  486. log.Println("conn err,dial 192.168.32.144:20001 failed", err2)
  487. }
  488. sendDBToReceiver = make(chan string, channelSize)
  489. for i := 0; i < 100; i++ {
  490. cleanDBToReceiver_wg.Add(1)
  491. go cleanDBToReceiver(httpConn_receiver)
  492. }
  493. middle_sum_number := 0
  494. for _, v := range arrival_phase_start_list {
  495. middle_sum_number = middle_sum_number + round - v
  496. }
  497. arrival_phase_number_list_lock.Lock()
  498. average_arrival_length = append(average_arrival_length, middle_sum_number/len(arrival_phase_start_list))
  499. arrival_phase_number_list = append(arrival_phase_number_list, len(arrival_phase_start_list))
  500. arrival_phase_number_list_lock.Unlock()
  501. arrival_phase_start_list = make([]int, 0)
  502. httpConn_client, err1 := net.Dial("tcp", "192.168.32.144:20002") //client address
  503. if err1 != nil {
  504. log.Println("conn err dial 192.168.32.144:20002 failed", err1)
  505. }
  506. sendToClient := ""
  507. for i, v := range Batch_List {
  508. sendDBToReceiver <- v
  509. Status.Store(v, 1)
  510. sendToClient = sendToClient + v + ","
  511. if (i%6000 == 0 && i != 0) || i == len(Batch_List)-1 {
  512. sendToCli := []byte(sendToClient)
  513. httpConn_client.Write(sendToCli)
  514. sendToClient = ""
  515. }
  516. }
  517. log.Println("Batch_List length", len(Batch_List))
  518. close(sendDBToReceiver)
  519. time.Sleep(time.Second * 2)
  520. httpConn_client.Write([]byte("changeEnd"))
  521. Batch_List = make([]string, 0)
  522. cleanDBToReceiver_wg.Wait()
  523. c, c_err := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379))
  524. if c_err != nil {
  525. log.Println("c_err=", c_err)
  526. }
  527. c.Do("select", 14)
  528. c.Do("flushdb")
  529. lastLearningPhase_end_flag = false
  530. return true
  531. } else {
  532. httpConn_client, err1 := net.Dial("tcp", "192.168.32.144:20002")
  533. if err1 != nil {
  534. log.Println("conn err dial 192.168.32.144:20002 failed", err1)
  535. }
  536. sendToCli := []byte("noChange")
  537. httpConn_client.Write(sendToCli)
  538. log.Println("no changed status")
  539. return false
  540. }
  541. }
  542. //same with v1
  543. func refresh_learningPhase_flag() bool {
  544. defer log.Println("refresh_learningPhase_flag ends")
  545. log.Println("refresh_learningPhase_flag starts")
  546. if round-learningphase_start_round == 24 {
  547. log.Println("round=", round, "learningphase_start_round=", learningphase_start_round)
  548. lastLearningPhase_end_flag = true
  549. if round >= 33 {
  550. lastLearningPhase_end_flag = false
  551. return false
  552. }
  553. last_Batch_List_to_online = make([]string, len(last_Batch_List))
  554. copy(last_Batch_List_to_online, last_Batch_List)
  555. return true
  556. }
  557. return false
  558. }
  559. //same with v1
  560. func learning_To_online_batchList(flag bool) {
  561. log.Println("learning_To_online starts,flag=", flag)
  562. time.Sleep(time.Second * 1)
  563. if !flag {
  564. httpConn_client, err1 := net.Dial("tcp", "192.168.32.144:20002")
  565. if err1 != nil {
  566. log.Println("conn err dial 192.168.32.144:20002 failed", err1)
  567. }
  568. sendToCli := []byte("noChange")
  569. httpConn_client.Write(sendToCli)
  570. return
  571. } else {
  572. httpConn_client, err1 := net.Dial("tcp", "192.168.32.144:20002")
  573. if err1 != nil {
  574. log.Println("conn err dial 192.168.32.144:20002 failed", err1)
  575. }
  576. sendToClient := ""
  577. for i, v := range last_Batch_List_to_online {
  578. Status.Store(v, 2)
  579. sendToClient = sendToClient + v + ","
  580. if (i%6000 == 0 && i != 0) || i == len(last_Batch_List_to_online)-1 {
  581. sendToCli := []byte(sendToClient)
  582. httpConn_client.Write(sendToCli)
  583. sendToClient = ""
  584. }
  585. }
  586. time.Sleep(time.Second * 1)
  587. httpConn_client.Write([]byte("changeEnd"))
  588. }
  589. }
  590. //same with v1
  591. func learning_function(flag bool) {
  592. defer log.Println("learning_function ends")
  593. log.Println("learning_function starts")
  594. if flag {
  595. log.Println("round=", round, "learningphase_start_round=", learningphase_start_round)
  596. //output.sh
  597. _, err := exec.Command("/bin/bash", "/home/it/middle_data/output.sh").CombinedOutput()
  598. if err != nil {
  599. log.Println("output err,cmd.Output:", err)
  600. return
  601. }
  602. log.Println("output.sh ends")
  603. starttime := time.Now()
  604. //final.py
  605. cmd, err_cmd := exec.Command("python3", "/home/it/learning-phase/final.py").CombinedOutput()
  606. if err_cmd != nil {
  607. log.Println("cmd.Output err,err_cmd=", err_cmd)
  608. return
  609. }
  610. log.Println("python result", string(cmd))
  611. cost := int(time.Since(starttime) / time.Second)
  612. duration = cost
  613. //combine.sh
  614. _, err_combine := exec.Command("/bin/bash", "/home/it/middle_data/combine.sh").CombinedOutput()
  615. if err_combine != nil {
  616. log.Println("combine err,cmd.Output:", err_combine)
  617. return
  618. }
  619. //3,save the result
  620. filePath1 := "/home/it/middle_data/filtered_good_name_list.txt"
  621. content1, err1 := ioutil.ReadFile(filePath1)
  622. if err1 != nil {
  623. panic(err1)
  624. }
  625. m1 := strings.Split(string(content1), "\n")
  626. filePath2 := "/home/it/middle_data/groupingResult.txt"
  627. content2, err2 := ioutil.ReadFile(filePath2)
  628. if err2 != nil {
  629. panic(err1)
  630. }
  631. m2 := strings.Split(string(content2), "\n")
  632. log.Println("m1 length=", len(m1), "m2 length=", len(m2))
  633. //create groups
  634. for i := 0; i < k_val; i++ {
  635. var g Group
  636. g.Group_Id = i + k_val*groupSet_Number
  637. g.Group_Active_Threshold = group_Threshold_global
  638. g.Group_Size = 0
  639. g.Group_Size_copy = 0
  640. g.Group_active = false
  641. g.Group_User_List = make([]string, 0)
  642. g.Group_start_round = round
  643. g.Group_online_sum_length = 0
  644. g.Group_delay_duration_sum = time.Duration(0)
  645. g.Group_send_times = 0
  646. g.Group_sleep_times = 0
  647. g.Group_cover = 0
  648. g.Group_delay_in_each_round = make([]time.Duration, 0)
  649. //low frequence
  650. if i == k_val-1 {
  651. g.Group_low_frequence = true
  652. } else {
  653. g.Group_low_frequence = false
  654. }
  655. //outlier
  656. if i == k_val-2 {
  657. g.Group_outlier_flag = true
  658. } else {
  659. g.Group_outlier_flag = false
  660. }
  661. //some init works
  662. online_msg_in_each_group_in_each_round[g.Group_Id] = make([]int, 0)
  663. group_size_in_each_round[g.Group_Id] = make([]int, 0)
  664. group_list = append(group_list, &g)
  665. number_group_hashmap[i+k_val*groupSet_Number] = &g
  666. }
  667. for i := 0; i < len(m1)-1; i++ {
  668. if i < len(m2)-2 {
  669. intm2, err_m2 := strconv.Atoi(m2[i])
  670. if err_m2 != nil {
  671. log.Println("err_m2=", err_m2)
  672. return
  673. }
  674. user_group_hashmap[m1[i]] = intm2 + groupSet_Number*k_val
  675. middle_g, ok_middle_g := number_group_hashmap[intm2+groupSet_Number*k_val]
  676. if !ok_middle_g {
  677. log.Println("err,group id=", intm2+groupSet_Number*k_val)
  678. } else {
  679. middle_g.Group_Size++
  680. middle_g.Group_User_List = append(middle_g.Group_User_List, m1[i])
  681. }
  682. } else {
  683. user_group_hashmap[m1[i]] = (groupSet_Number+1)*k_val - 1
  684. middle_g, ok_middle_g := number_group_hashmap[(groupSet_Number+1)*k_val-1]
  685. if !ok_middle_g {
  686. log.Println("err,group id=", (groupSet_Number+1)*k_val-1)
  687. return
  688. } else {
  689. middle_g.Group_Size++
  690. middle_g.Group_User_List = append(middle_g.Group_User_List, m1[i])
  691. }
  692. }
  693. }
  694. sum := 0
  695. for i := 0; i < k_val; i++ {
  696. gg := number_group_hashmap[i+k_val*groupSet_Number]
  697. log.Println("id=", gg.Group_Id, "i+ k_val*groupSet_Number=", i+k_val*groupSet_Number, "的group,其size=", gg.Group_Size)
  698. sum = sum + gg.Group_Size
  699. }
  700. log.Println("groupSet_Number=", groupSet_Number)
  701. groupSet_Number++
  702. log.Println("groupSet_Number=", groupSet_Number)
  703. return
  704. } else {
  705. return
  706. }
  707. }
  708. var group_active_function_db12_lock sync.Mutex
  709. var group_active_function_db12_rpop_lock sync.Mutex
  710. var eliminated_user_channel chan string
  711. var active_user_channel chan string
  712. var k_val int
  713. var group_Threshold_global int
  714. var group_Threshold_outlier int
  715. var delay_limitation int
  716. var outlier_delay_limitation int
  717. var Group_sleep_times_threshold int
  718. //same with v1
  719. func group_active_function(gg *Group, list_1 []string, list_2 []string) {
  720. defer log.Println("group_id=", gg.Group_Id, "group_active_function ends,Group_Size", gg.Group_Size)
  721. log.Println("group_id=", gg.Group_Id, "group, group_active_function starts ,Group_Size", gg.Group_Size)
  722. c_active, err_active := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379))
  723. if err_active != nil {
  724. log.Println("err_active=", err_active)
  725. return
  726. }
  727. gg.Group_online_sum_length = gg.Group_online_sum_length + len(list_2)*(round-gg.Group_start_round)
  728. gg.Group_Size_copy = gg.Group_Size_copy + len(list_2)
  729. for _, v := range list_2 {
  730. x, load_ok := Status.Load(v)
  731. if load_ok {
  732. if x != 2 {
  733. err_result = append(err_result, v)
  734. }
  735. }
  736. Status.Delete(v)
  737. eliminated_user_channel <- v
  738. c_active.Do("select", 12)
  739. group_active_function_db12_lock.Lock()
  740. _, err_del := c_active.Do("del", v)
  741. group_active_function_db12_lock.Unlock()
  742. if err_del != nil {
  743. log.Println("err_del=", err_del)
  744. return
  745. }
  746. }
  747. gg.Group_Size = len(list_1)
  748. gg.Group_User_List = list_1
  749. c_active.Do("select", 12)
  750. each_round_delay := time.Duration(0)
  751. not_cover_number := 0
  752. for _, v := range list_1 {
  753. x, load_ok := Status.Load(v)
  754. if load_ok {
  755. if x != 2 {
  756. err_result = append(err_result, v)
  757. }
  758. }
  759. group_active_function_db12_rpop_lock.Lock()
  760. result, err_rpop := c_active.Do("rpop", v)
  761. group_active_function_db12_rpop_lock.Unlock()
  762. if err_rpop != nil {
  763. log.Println("err_rpop=", err_rpop)
  764. }
  765. res, err_string := redis.String(result, nil)
  766. if err_string != nil {
  767. log.Println("err_string=", err_string)
  768. }
  769. active_user_channel <- res
  770. //calculate the latency
  771. mid := []byte(res)
  772. length := 0
  773. for i := len(res) - 1; ; i-- {
  774. if i < 0 {
  775. log.Println("res,res=", res, "len(res)=", len(res), "len(mid)=", len(mid), "mid=", mid)
  776. }
  777. if mid[i] == 125 { //125=}
  778. length = i
  779. break
  780. }
  781. }
  782. mid = mid[:length+1]
  783. var stru map[string]interface{}
  784. err_unmarshal := json.Unmarshal(mid, &stru)
  785. if err_unmarshal != nil {
  786. log.Println("err_unmarshal=", err_unmarshal, "len(mid=)", len(mid))
  787. }
  788. cov, err_cover := redis.String(stru["Cover_Msg"], nil)
  789. if err_cover != nil {
  790. log.Println("err_cover", err_cover)
  791. break
  792. }
  793. if cov == "yes" {
  794. continue
  795. }
  796. not_cover_number = not_cover_number + 1
  797. gg.Group_send_times = gg.Group_send_times + 1
  798. times, err_time := redis.Strings(stru["Megred_Timestamp"], nil)
  799. if err_time != nil {
  800. log.Println("err_time", err_time, "a=", res)
  801. }
  802. msg_time := times[0]
  803. loc, err_loc := time.LoadLocation("Local")
  804. if err_loc != nil {
  805. fmt.Println("err_loc=", err_loc)
  806. }
  807. the_time_of_msg, err_the_time_msg := time.ParseInLocation("20060102150405", msg_time, loc)
  808. if err_the_time_msg != nil {
  809. fmt.Println("err_the_time_msg=", err_the_time_msg)
  810. }
  811. the_time_of_agent, err_the_time_agent := time.ParseInLocation("20060102150405", "20121101120000", loc)
  812. if err_the_time_agent != nil {
  813. fmt.Println("err_the_time_agent=", err_the_time_agent)
  814. }
  815. send_out_timepoint := the_time_of_agent.Add(time.Duration(round+1) * time.Hour)
  816. duration := send_out_timepoint.Sub(the_time_of_msg)
  817. gg.Group_delay_duration_sum = gg.Group_delay_duration_sum + duration
  818. each_round_delay = each_round_delay + duration
  819. }
  820. if not_cover_number != 0 {
  821. ave_delay := each_round_delay / time.Duration(not_cover_number)
  822. gg.Group_delay_in_each_round = append(gg.Group_delay_in_each_round, ave_delay)
  823. }
  824. }
  825. var each_group_process_lock sync.Mutex
  826. var each_group_process_wg sync.WaitGroup
  827. //same with v1
  828. func each_group_process(g *Group) {
  829. defer each_group_process_wg.Done()
  830. c_db12, err_db12 := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379))
  831. if err_db12 != nil {
  832. log.Println("err_db12=", err_db12)
  833. return
  834. }
  835. c_db12.Do("select", 12)
  836. middle_active_list := make([]string, 0)
  837. middle_inactive_list := make([]string, 0)
  838. if g.Group_Size == 1 {
  839. middle_inactive_list = append(middle_inactive_list, g.Group_User_List[0])
  840. } else {
  841. for _, v := range g.Group_User_List {
  842. each_group_process_lock.Lock()
  843. agent_queue_length, err_agent_queue_length := c_db12.Do("llen", v)
  844. each_group_process_lock.Unlock()
  845. if err_agent_queue_length != nil {
  846. log.Println("err_agent_queue_length=", err_agent_queue_length)
  847. return
  848. }
  849. length, err_length := redis.Int(agent_queue_length, nil)
  850. if err_length != nil {
  851. log.Println("err_length=", err_length)
  852. return
  853. }
  854. if g.Group_outlier_flag {
  855. if length == 0 || length >= delay_limitation {
  856. middle_inactive_list = append(middle_inactive_list, v)
  857. } else {
  858. middle_active_list = append(middle_active_list, v)
  859. }
  860. } else {
  861. if length >= 1 && length <= delay_limitation {
  862. middle_active_list = append(middle_active_list, v)
  863. } else {
  864. middle_inactive_list = append(middle_inactive_list, v)
  865. }
  866. }
  867. }
  868. }
  869. number := float64(0)
  870. //outlier
  871. if g.Group_outlier_flag {
  872. number = float64(g.Group_Size*g.Group_Active_Threshold) / float64(100)
  873. }
  874. if g.Group_active {
  875. log.Println("group_id=", g.Group_Id, "group,threshold number=", number, "active user number=", len(middle_active_list))
  876. group_active_function(g, middle_active_list, middle_inactive_list)
  877. } else {
  878. log.Println("group_id=", g.Group_Id, "group,threshold number=", number, "active user number", len(middle_active_list))
  879. }
  880. log.Println("group_id=", g.Group_Id, "group is ", g.Group_active, ",active user number=", len(middle_active_list), "inactive user number =", len(middle_inactive_list), "group_size=", g.Group_Size, "Group_User_List length=", len(g.Group_User_List))
  881. }
  882. var db12_send_to_receiver_wg sync.WaitGroup
  883. var active_user_channel_closed bool
  884. //same with v1
  885. func db12_send_to_receiver(c net.Conn) {
  886. defer db12_send_to_receiver_wg.Done()
  887. for {
  888. x, ok := <-active_user_channel
  889. if ok {
  890. jsons, err_jsons := json.Marshal(x)
  891. if err_jsons != nil {
  892. log.Println("db14, err_jsons=", err_jsons)
  893. }
  894. c.Write(jsons)
  895. } else {
  896. if active_user_channel_closed {
  897. return
  898. } else {
  899. continue
  900. }
  901. }
  902. }
  903. }
  904. var eliminated_user_channel_closed bool
  905. var online_eliminated_batchList_wg sync.WaitGroup
  906. //same with v1
  907. func online_eliminated_batchList() {
  908. defer online_eliminated_batchList_wg.Done()
  909. log.Println("online_eliminated_batchList starts")
  910. time.Sleep(time.Second * 2)
  911. httpConn_client, err1 := net.Dial("tcp", "192.168.32.144:20002") //client address
  912. if err1 != nil {
  913. log.Println("online_eliminated_batchList conn err dial 192.168.32.144:20002 failed", err1)
  914. }
  915. number := 0
  916. eliminated_list := make([]string, 0)
  917. for {
  918. v, ok := <-eliminated_user_channel
  919. if ok {
  920. number++
  921. eliminated_list = append(eliminated_list, v)
  922. } else {
  923. if eliminated_user_channel_closed {
  924. break
  925. } else {
  926. continue
  927. }
  928. }
  929. }
  930. if len(eliminated_list) > 0 {
  931. sendToClient := ""
  932. for i, v := range eliminated_list {
  933. sendToClient = sendToClient + v + ","
  934. if (i%6000 == 0 && i != 0) || i == len(eliminated_list)-1 {
  935. sendToCli := []byte(sendToClient)
  936. httpConn_client.Write(sendToCli)
  937. sendToClient = ""
  938. }
  939. }
  940. time.Sleep(time.Second * 2)
  941. httpConn_client.Write([]byte("changeEnd"))
  942. } else {
  943. httpConn_client.Write([]byte("noChange"))
  944. }
  945. log.Println("len(eliminated_list)", len(eliminated_list), "number=", number)
  946. }
  947. //same with v1
  948. func online_cache_process() {
  949. c, c_err := net.Dial("tcp", "192.168.32.144:20001") //receiver address
  950. log.Println("online_cache_process starts")
  951. if c_err != nil {
  952. log.Println("online msg db12 conn err dial 192.168.32.144:20001 failed", c_err)
  953. }
  954. active_user_channel_closed = false
  955. eliminated_user_channel_closed = false
  956. for i := 0; i < 10; i++ {
  957. db12_send_to_receiver_wg.Add(1)
  958. go db12_send_to_receiver(c)
  959. }
  960. online_eliminated_batchList_wg.Add(1)
  961. go online_eliminated_batchList()
  962. log.Println("group_list length", len(group_list))
  963. for _, v := range group_list {
  964. each_group_process_wg.Add(1)
  965. each_group_process(v)
  966. }
  967. each_group_process_wg.Wait()
  968. close(active_user_channel)
  969. active_user_channel_closed = true
  970. close(eliminated_user_channel)
  971. eliminated_user_channel_closed = true
  972. db12_send_to_receiver_wg.Wait()
  973. online_eliminated_batchList_wg.Wait()
  974. }
  975. func record_group_size() {
  976. for _, v := range group_list {
  977. group_size_in_each_round[v.Group_Id] = append(group_size_in_each_round[v.Group_Id], v.Group_Size)
  978. }
  979. }
  980. func initMetric() {
  981. log.Println("initMetric() starts")
  982. Batch_list_threshold = 10000
  983. channelSize = Batch_list_threshold + 10000
  984. round = 0
  985. lastLearningPhase_end_flag = true
  986. packetLength = 3000
  987. k_val = 15
  988. group_Threshold_global = 30
  989. group_Threshold_outlier = 30
  990. delay_limitation = 2
  991. outlier_delay_limitation = 2
  992. Group_sleep_times_threshold = 5
  993. }
  994. func initNameListAndBatchList() {
  995. log.Println("initNameListAndBatchList() starts")
  996. Batch_List = make([]string, 0)
  997. NameList = make([]string, 0)
  998. number_group_hashmap = make(map[int]*Group)
  999. groupSet_Number = 0
  1000. group_list = make([]*Group, 0)
  1001. online_msg_in_each_group_in_each_round = make(map[int][]int)
  1002. user_group_hashmap = make(map[string]int)
  1003. group_size_in_each_round = make(map[int][]int)
  1004. err_result = make([]string, 0)
  1005. arrival_phase_start_list = make([]int, 0)
  1006. average_arrival_length = make([]int, 0)
  1007. arrival_phase_number_list = make([]int, 0)
  1008. }
  1009. func generateLineItems(result []int) []opts.LineData {
  1010. items := make([]opts.LineData, 0)
  1011. for i := 0; i < len(result); i++ {
  1012. items = append(items, opts.LineData{Value: result[i]})
  1013. }
  1014. return items
  1015. }
  1016. var err_result []string
  1017. var duration int
  1018. // the only difference with v1 and v2
  1019. func before() {
  1020. send := make([]string, 0) //users who should send message in this round
  1021. not_send := make([]string, 0) ////users who should not send message in this round
  1022. //receive all active users' uid
  1023. log.Println("before() starts")
  1024. listener, err := net.Listen("tcp", "192.168.32.144:20010")
  1025. if err != nil {
  1026. fmt.Println("before as server,listen 192.168.32.144:20010 failed", err)
  1027. return
  1028. }
  1029. conn, err := listener.Accept()
  1030. if err != nil {
  1031. fmt.Println("err,accept failed", err)
  1032. return
  1033. }
  1034. var tmp [10000]byte
  1035. res := ""
  1036. flag := false
  1037. for {
  1038. n, err := conn.Read(tmp[:])
  1039. if err != nil {
  1040. log.Println("err,read from conn failed", err)
  1041. break
  1042. }
  1043. //no active users,directly return
  1044. if string(tmp[:8]) == "noChange" {
  1045. log.Println("no before() active users in this round")
  1046. listener.Close()
  1047. flag = false
  1048. break
  1049. } else {
  1050. result := string(tmp[:n])
  1051. res = res + result
  1052. if string(tmp[:9]) == "changeEnd" {
  1053. fmt.Println("there are active users in this round")
  1054. listener.Close()
  1055. flag = true
  1056. break
  1057. }
  1058. }
  1059. }
  1060. //active users in each group
  1061. middle_each_group := make(map[int]int)
  1062. if flag {
  1063. res = strings.Replace(res, " ", "", -1)
  1064. res = strings.Replace(res, "\n", "", -1)
  1065. str_arr := strings.Split(res, ",")
  1066. //acitve users number in each group
  1067. for i := 0; i < len(str_arr)-1; i++ {
  1068. gid := user_group_hashmap[str_arr[i]]
  1069. middle_each_group[gid]++
  1070. }
  1071. log.Println("before,active users number", len(str_arr)-1)
  1072. for gid, v := range middle_each_group {
  1073. log.Println("gid=", gid, "threshold*size=", float64(group_list[gid].Group_Active_Threshold)*float64(group_list[gid].Group_Size)/float64(100), "v=", v)
  1074. if float64(group_list[gid].Group_Active_Threshold)*float64(group_list[gid].Group_Size)/float64(100) < float64(v) { //anonymity set is online
  1075. send = append(send, group_list[gid].Group_User_List...)
  1076. //self-adjusting online threshold part, online threshold changes between 27 and 33
  1077. if group_list[gid].Group_Active_Threshold < 33 {
  1078. group_list[gid].Group_Active_Threshold++
  1079. }
  1080. } else { //anonymity set is offline
  1081. not_send = append(not_send, group_list[gid].Group_User_List...)
  1082. if group_list[gid].Group_Active_Threshold > 27 {
  1083. group_list[gid].Group_Active_Threshold--
  1084. }
  1085. }
  1086. }
  1087. } else {
  1088. log.Println("before has active users")
  1089. }
  1090. log.Println("before, ", len(send), "users should send message in this round and", len(not_send), "users should not send message")
  1091. //send this feedback to client
  1092. time.Sleep(time.Second * 2)
  1093. c1, c_err := net.Dial("tcp", "192.168.32.144:20012") //client address
  1094. if c_err != nil {
  1095. log.Println("err,conn before in client ,err,dial 192.168.32.144:20012 failed", c_err)
  1096. }
  1097. if len(send) > 0 {
  1098. sendToAgent := ""
  1099. for i, v := range send {
  1100. sendToAgent = sendToAgent + v + ","
  1101. if (i%6000 == 0 && i != 0) || i == len(send)-1 {
  1102. sendToCli := []byte(sendToAgent)
  1103. c1.Write(sendToCli)
  1104. sendToAgent = ""
  1105. }
  1106. }
  1107. time.Sleep(time.Second * 2)
  1108. c1.Write([]byte("changeEnd"))
  1109. } else {
  1110. c1.Write([]byte("noChange"))
  1111. }
  1112. log.Println("before ends, ", len(send), "users should send,", len(not_send), "users should not send in this round")
  1113. }
  1114. func main() {
  1115. //log
  1116. logFile, err := os.OpenFile("log.txt", os.O_CREATE|os.O_APPEND|os.O_RDWR, 0666)
  1117. if err != nil {
  1118. panic(err)
  1119. }
  1120. defer logFile.Close()
  1121. mw := io.MultiWriter(os.Stdout, logFile)
  1122. log.SetOutput(mw)
  1123. connID = 0
  1124. var connID int
  1125. listener, err := net.Listen("tcp", "192.168.32.144:20000")
  1126. if err != nil {
  1127. log.Println("start tcp server 192.168.32.144:20000 failed", err)
  1128. return
  1129. }
  1130. initNameListAndBatchList()
  1131. initMetric()
  1132. initDb()
  1133. for i := 0; i < 35; i++ {
  1134. log.Println("--------------------------------------------------the ", i, "th round starts----------------------------------------------")
  1135. eliminated_user_channel = make(chan string, 100000)
  1136. active_user_channel = make(chan string, 100000)
  1137. online_msg_number = 0
  1138. online_msg_number_empty = 0
  1139. round = i
  1140. cov_rec = 0
  1141. real_rec = 0
  1142. before()
  1143. conn, err := listener.Accept()
  1144. if err != nil {
  1145. log.Println("accept failed", err)
  1146. return
  1147. }
  1148. connID = connID + 1
  1149. log.Println("connID", connID)
  1150. peocessConn(conn, connID)
  1151. h24 := refresh_learningPhase_flag()
  1152. arrivalToLearn1()
  1153. learning_function(h24)
  1154. learning_To_online_batchList(h24)
  1155. online_msg_in_each_group_in_each_round_middle = make([]int, k_val*(groupSet_Number))
  1156. online_cache_process()
  1157. log.Println("online_msg_number=", online_msg_number, "online_msg_number_empty=", online_msg_number_empty)
  1158. record_group_size()
  1159. log.Println("len(NameList)", len(NameList))
  1160. log.Println("cov_rec,real_rec are", cov_rec, real_rec)
  1161. log.Println("---------------------------------------------------next round---------------------------------------------------")
  1162. }
  1163. x_zhou := make([]int, len(group_list))
  1164. for i := 0; i < len(group_list); i++ {
  1165. x_zhou[i] = i
  1166. }
  1167. ave_groupsize := make([]int, len(group_size_in_each_round[0]))
  1168. for i, v := range group_list {
  1169. line := charts.NewLine()
  1170. line.SetGlobalOptions(
  1171. charts.WithInitializationOpts(opts.Initialization{Theme: types.ThemeWesteros}),
  1172. charts.WithTitleOpts(opts.Title{
  1173. Title: ("group_id=" + strconv.Itoa(v.Group_Id)),
  1174. Subtitle: "k_val=" + strconv.Itoa(k_val) + ";delay_limitation=" + strconv.Itoa(delay_limitation) + ";Group_Active_Threshold=" + strconv.Itoa(v.Group_Active_Threshold),
  1175. }))
  1176. log.Println("group_id=", i, "group's group_size", group_size_in_each_round[v.Group_Id])
  1177. if !v.Group_low_frequence {
  1178. for i, v := range group_size_in_each_round[v.Group_Id] {
  1179. if v == 0 {
  1180. ave_groupsize[i] = ave_groupsize[i] + 1
  1181. } else {
  1182. ave_groupsize[i] = ave_groupsize[i] + v
  1183. }
  1184. }
  1185. }
  1186. log.Println("group_id=", i, "group,online_msg_in_each_group_in_each_round", online_msg_in_each_group_in_each_round[v.Group_Id])
  1187. line.SetXAxis(x_zhou).
  1188. AddSeries("Category A", generateLineItems(online_msg_in_each_group_in_each_round[v.Group_Id])).
  1189. AddSeries("Category B", generateLineItems(group_size_in_each_round[v.Group_Id])).
  1190. SetSeriesOptions(charts.WithLineChartOpts(opts.LineChart{Step: true, ConnectNulls: true}))
  1191. name1 := "group_"
  1192. name2 := ".html"
  1193. name := name1 + strconv.Itoa(i) + name2
  1194. w, _ := os.Create(name)
  1195. line.Render(w)
  1196. }
  1197. log.Println(err_result)
  1198. log.Println("k_val=", k_val, "duration=", duration)
  1199. log.Println("arrival phase", arrival_phase_number_list, average_arrival_length)
  1200. sum_number := 0
  1201. sum_length := 0
  1202. sum_duration := time.Duration(0)
  1203. sum_send_times := 0
  1204. for _, v := range group_list {
  1205. if !v.Group_low_frequence && !v.Group_outlier_flag {
  1206. log.Println(v.Group_Id, "eliminated users number=", v.Group_Size_copy)
  1207. sum_length = v.Group_online_sum_length + sum_length
  1208. sum_number = sum_number + v.Group_Size_copy
  1209. sum_duration = sum_duration + v.Group_delay_duration_sum
  1210. sum_send_times = sum_send_times + v.Group_send_times
  1211. if v.Group_send_times == 0 {
  1212. log.Println(v.Group_Id)
  1213. continue
  1214. }
  1215. }
  1216. }
  1217. log.Println("average online phase length=", float64(sum_length)/float64(sum_number), "eliminated users sum number", sum_number, "average latency in online phase", sum_duration/time.Duration(sum_send_times), "check total_cover=,", total_cover, "average cover", float64(total_cover)/float64(sum_number))
  1218. ave_groupsize_double := make([]float64, len(ave_groupsize))
  1219. for i, v := range ave_groupsize {
  1220. ave_groupsize_double[i] = math.Round((float64(v) / float64(k_val-1)))
  1221. }
  1222. log.Println("average group size=", ave_groupsize_double)
  1223. log.Println("group size sum=", ave_groupsize)
  1224. }