main.go 32 KB


  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "log"
  8. "math"
  9. "net"
  10. "os"
  11. "os/exec"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "time"
  16. // "github.com/go-echarts/go-echarts/v2/charts"
  17. // "github.com/go-echarts/go-echarts/v2/opts"
  18. // "github.com/go-echarts/go-echarts/v2/types"
  19. "github.com/go-echarts/go-echarts/v2/opts"
  20. "github.com/gomodule/redigo/redis"
  21. )
  22. var channelSize int
  23. var connID int
  24. type Rec struct {
  25. Megred_Timestamp []string
  26. Merged_Hashtags []string
  27. Megred_User_Id string
  28. Megred_Ip string
  29. Cover_Msg string
  30. }
  31. var arrival_phase_start_list []int
  32. var arrival_phase_number_list []int
  33. var arrival_phase_number_list_lock sync.Mutex
  34. var average_arrival_length []int
  35. var Status sync.Map
  36. var NameList []string
  37. var NameList_Append_Lock sync.Mutex
  38. var Batch_List []string
  39. var Batch_List_Append_Lock sync.Mutex
  40. var Batch_list_threshold int
  41. var putInChannel chan string
  42. var receive_wg sync.WaitGroup
  43. var peocessConn_wg sync.WaitGroup
  44. var uid_channel chan string
  45. var uip_channel chan string
  46. var db14_agentqueue_key chan string
  47. var db14_agentqueue_val chan string
  48. var db14_agentqueue_Lock sync.Mutex
  49. var db13_learning_key chan string
  50. var db13_learning_val chan int
  51. var db13_learning_Lock sync.Mutex
  52. var db12_online_key chan string
  53. var db12_online_val chan string
  54. var db12_online_Lock sync.Mutex
  55. var online_msg_number int
  56. var online_msg_number_empty int
  57. var online_msg_in_each_group_in_each_round map[int][]int
  58. var group_size_in_each_round map[int][]int
  59. var online_msg_in_each_group_in_each_round_middle_wg sync.Mutex
  60. var online_msg_in_each_group_in_each_round_middle []int
  61. var user_group_hashmap map[string]int
  62. var sendDBToReceiver chan string
  63. var cleanDBToReceiver_wg sync.WaitGroup
  64. var sendDBToReceiver_lock sync.Mutex
  65. var cov_rec int
  66. var real_rec int
  67. var cov_lock sync.Mutex
  68. var real_lock sync.Mutex
  69. var packetLength int
  70. var total_cover int
  71. var total_cover_lock sync.Mutex
  72. var groupSet_Number int
  73. var number_group_hashmap map[int]*Group
  74. type Group struct {
  75. Group_Id int
  76. Group_User_List []string
  77. Group_Size int
  78. Group_Size_copy int
  79. Group_Active_Threshold int
  80. Group_active bool
  81. Group_low_frequence bool
  82. Group_start_round int
  83. Group_online_sum_length int
  84. Group_delay_duration_sum time.Duration
  85. Group_outlier_flag bool
  86. Group_send_times int
  87. Group_sleep_times int
  88. Group_cover int
  89. }
  90. var group_list []*Group
  91. //same with v1 and v2
  92. func receive(c net.Conn) {
  93. defer receive_wg.Done()
  94. for {
  95. a, ok_return := <-putInChannel
  96. if !ok_return {
  97. return
  98. }
  99. mid := []byte(a)
  100. length := 0
  101. if a == "" {
  102. continue
  103. }
  104. for i := len(a) - 1; ; i-- {
  105. if i < 0 {
  106. log.Println("err,a=", a, "len(a)=", len(a), "len(mid)=", len(mid), "mid=", mid)
  107. }
  108. if mid[i] == 125 { //125=}
  109. length = i
  110. break
  111. }
  112. }
  113. mid = mid[:length+1]
  114. var stru map[string]interface{}
  115. err_unmarshal := json.Unmarshal(mid, &stru)
  116. if err_unmarshal != nil {
  117. log.Println(",err_unmarshal=", err_unmarshal, "len(mid=)", len(mid), "len(a)=", len(a))
  118. }
  119. var structure Rec
  120. //1,time
  121. times, err_time := redis.Strings(stru["Megred_Timestamp"], nil)
  122. if err_time != nil {
  123. log.Println("err_time", err_time, "a=", a)
  124. }
  125. structure.Megred_Timestamp = times
  126. //2,msg
  127. msgs, err_msgs := redis.Strings(stru["Megred_Hashtags"], nil)
  128. if err_msgs != nil {
  129. log.Println("err_msgs", err_msgs, "a", a)
  130. }
  131. structure.Merged_Hashtags = msgs
  132. //3,uid
  133. uid, err_uid := redis.String(stru["Megred_User_Id"], nil)
  134. if err_uid != nil {
  135. log.Println("err_uid", err_uid, "a", a)
  136. }
  137. structure.Megred_User_Id = uid
  138. //4,ip
  139. ip, err_ip := redis.String(stru["Megred_Ip"], nil)
  140. if err_ip != nil {
  141. log.Println("err_ip", err_ip, "a", a)
  142. }
  143. structure.Megred_Ip = ip
  144. //5.cover
  145. cov, err_cover := redis.String(stru["Cover_Msg"], nil)
  146. if err_cover != nil {
  147. log.Println("err_cover", err_cover)
  148. break
  149. }
  150. structure.Cover_Msg = cov
  151. staa, ok := Status.Load(structure.Megred_User_Id)
  152. if !ok {
  153. if structure.Megred_User_Id == "" {
  154. continue
  155. }
  156. Status.Store(structure.Megred_User_Id, 0)
  157. NameList_Append_Lock.Lock()
  158. NameList = append(NameList, structure.Megred_User_Id)
  159. NameList_Append_Lock.Unlock()
  160. Batch_List_Append_Lock.Lock()
  161. Batch_List = append(Batch_List, structure.Megred_User_Id)
  162. Batch_List_Append_Lock.Unlock()
  163. db14_agentqueue_Lock.Lock()
  164. db14_agentqueue_key <- structure.Megred_User_Id
  165. db14_agentqueue_val <- a
  166. db14_agentqueue_Lock.Unlock()
  167. arrival_phase_start_list = append(arrival_phase_start_list, round)
  168. continue
  169. } else {
  170. if staa == 0 {
  171. //1,arrival
  172. db14_agentqueue_Lock.Lock()
  173. db14_agentqueue_key <- structure.Megred_User_Id
  174. db14_agentqueue_val <- a
  175. db14_agentqueue_Lock.Unlock()
  176. continue
  177. }
  178. //b,learning
  179. if staa == 1 {
  180. if structure.Cover_Msg == "yes" {
  181. db13_learning_Lock.Lock()
  182. db13_learning_key <- structure.Megred_User_Id
  183. db13_learning_val <- 0
  184. db13_learning_Lock.Unlock()
  185. cov_lock.Lock()
  186. cov_rec++
  187. cov_lock.Unlock()
  188. } else {
  189. db13_learning_Lock.Lock()
  190. db13_learning_key <- structure.Megred_User_Id
  191. db13_learning_val <- 1
  192. db13_learning_Lock.Unlock()
  193. real_lock.Lock()
  194. real_rec++
  195. real_lock.Unlock()
  196. }
  197. c.Write(mid)
  198. continue
  199. }
  200. //c,online
  201. if staa == 2 {
  202. db12_online_Lock.Lock()
  203. db12_online_key <- structure.Megred_User_Id
  204. db12_online_val <- a
  205. online_msg_number++
  206. db12_online_Lock.Unlock()
  207. online_msg_in_each_group_in_each_round_middle_wg.Lock()
  208. gid := user_group_hashmap[structure.Megred_User_Id]
  209. online_msg_in_each_group_in_each_round_middle[gid]++
  210. online_msg_in_each_group_in_each_round_middle_wg.Unlock()
  211. if structure.Cover_Msg == "yes" {
  212. total_cover_lock.Lock()
  213. total_cover++
  214. total_cover_lock.Unlock()
  215. }
  216. continue
  217. }
  218. }
  219. }
  220. }
  221. //db14,sima with v1
  222. var db14_lpush_wg sync.WaitGroup
  223. var db14_lpush_lock sync.Mutex
  224. func db14_lpush() {
  225. defer db14_lpush_wg.Done()
  226. arrival_msg_num := 0
  227. log.Println("db14_rpush() starts")
  228. c_db14, _ := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379))
  229. c_db14.Do("select", 14)
  230. for {
  231. db14_lpush_lock.Lock()
  232. x, ok1 := <-db14_agentqueue_key
  233. y, ok2 := <-db14_agentqueue_val
  234. db14_lpush_lock.Unlock()
  235. if ok1 && ok2 {
  236. c_db14.Do("lpush", x, y)
  237. arrival_msg_num++
  238. }
  239. if (!ok1 || !ok2) && db13_db14_db12_waitForReceive_flag {
  240. log.Println("db14_lpush() ends,arrival_msg_num=", arrival_msg_num)
  241. return
  242. }
  243. }
  244. }
  245. //dn13,same with v1
  246. var db13_rpush_wg sync.WaitGroup
  247. var db13_rpush_lock sync.Mutex
  248. func db13_rpush() {
  249. defer db13_rpush_wg.Done()
  250. log.Println("db13_rpush() starts")
  251. real := 0
  252. cov := 0
  253. c_db13, _ := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379))
  254. c_db13.Do("select", 13)
  255. for {
  256. db13_rpush_lock.Lock()
  257. x, ok1 := <-db13_learning_key
  258. y, ok2 := <-db13_learning_val
  259. db13_rpush_lock.Unlock()
  260. if ok1 && ok2 {
  261. if y == 0 {
  262. cov = cov + 1
  263. } else {
  264. real = real + 1
  265. }
  266. c_db13.Do("rpush", x, y)
  267. }
  268. if db13_db14_db12_waitForReceive_flag && (!ok1 || !ok2) {
  269. log.Println("db13_rpush() ends,real,cov are", real, cov, "total number", real+cov)
  270. return
  271. }
  272. }
  273. }
  274. //db12,same with v1
  275. var db12_lpush_wg sync.WaitGroup
  276. var db12_lpush_lock sync.Mutex
  277. var test_list []string
  278. func db12_lpush() {
  279. defer db12_lpush_wg.Done()
  280. online_msg := 0
  281. log.Println("db12_lpush()starts")
  282. c_db12, _ := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379))
  283. c_db12.Do("select", 12)
  284. for {
  285. db12_lpush_lock.Lock()
  286. x, ok1 := <-db12_online_key
  287. test_list = append(test_list, x)
  288. y, ok2 := <-db12_online_val
  289. db12_lpush_lock.Unlock()
  290. if ok1 && ok2 {
  291. c_db12.Do("lpush", x, y)
  292. online_msg++
  293. }
  294. if db13_db14_db12_waitForReceive_flag && (!ok1 || !ok2) {
  295. log.Println("db12_lpush() ends,online_msg", online_msg)
  296. return
  297. }
  298. }
  299. }
  300. func initDb() {
  301. log.Println("initDb()")
  302. c, c_err := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379))
  303. if c_err != nil {
  304. log.Println("c_err=", c_err)
  305. }
  306. c.Do("select", 14)
  307. c.Do("Flushdb")
  308. c.Do("select", 13)
  309. c.Do("Flushdb")
  310. c.Do("select", 12)
  311. c.Do("Flushdb")
  312. }
  313. func initChannelAndListAndMap() {
  314. log.Println("initChannelAndListAndMap")
  315. putInChannel = make(chan string, channelSize)
  316. db14_agentqueue_key = make(chan string, channelSize)
  317. db14_agentqueue_val = make(chan string, channelSize)
  318. db13_learning_key = make(chan string, channelSize)
  319. db13_learning_val = make(chan int, channelSize)
  320. db12_online_key = make(chan string, channelSize)
  321. db12_online_val = make(chan string, channelSize)
  322. }
  323. var db13_db14_db12_waitForReceive_flag bool
  324. //same with v1
  325. func peocessConn(conn net.Conn, connID int) {
  326. db13_db14_db12_waitForReceive_flag = false
  327. log.Println("peocessConn starts")
  328. initChannelAndListAndMap()
  329. db14_lpush_wg.Add(1)
  330. go db14_lpush()
  331. db13_rpush_wg.Add(1)
  332. go db13_rpush()
  333. db12_lpush_wg.Add(1)
  334. go db12_lpush()
  335. c, c_err := net.Dial("tcp", "192.168.32.144:20001") //recipients address
  336. if c_err != nil {
  337. log.Println("conn err,dial 192.168.32.144:20001 failed", c_err)
  338. }
  339. for i := 0; i < 500; i++ {
  340. receive_wg.Add(1)
  341. go receive(c)
  342. }
  343. var tmp [3000]byte
  344. index := 0
  345. total_msg_number := 0
  346. for {
  347. total_msg_number++
  348. _, err := conn.Read(tmp[:])
  349. if err != nil {
  350. log.Println("read from conn failed", err)
  351. return
  352. }
  353. if string(tmp[:8]) == "roundEnd" {
  354. log.Println("client ends the conn,total_msg_number", total_msg_number)
  355. break
  356. }
  357. //Sticky bag problem
  358. if tmp[0] != 123 {
  359. var corrext_tmp []byte
  360. var corrext_follow []byte
  361. var correct_length int
  362. for i, v := range tmp[:] {
  363. if v == 123 {
  364. correct_length = i
  365. corrext_follow = make([]byte, correct_length)
  366. _, err_follow := conn.Read(corrext_follow[:])
  367. if err_follow != nil {
  368. log.Println("read from conn failed,err_follow=", err_follow)
  369. return
  370. }
  371. corrext_tmp = append(tmp[i:], corrext_follow[:]...)
  372. if len(corrext_tmp) != packetLength {
  373. log.Println("len(corrext_tmp)", len(corrext_tmp))
  374. }
  375. break
  376. }
  377. }
  378. putInChannel <- string(corrext_tmp[:])
  379. index++
  380. log.Println("total_msg_number=", total_msg_number)
  381. continue
  382. }
  383. putInChannel <- string(tmp[:])
  384. }
  385. close(putInChannel)
  386. receive_wg.Wait()
  387. //check if each anonymity set is active in this round
  388. if len(group_list) > 0 {
  389. for _, v := range group_list {
  390. online_msg_in_each_group_in_each_round[v.Group_Id] = append(online_msg_in_each_group_in_each_round[v.Group_Id], online_msg_in_each_group_in_each_round_middle[v.Group_Id])
  391. number := float64(0)
  392. if v.Group_outlier_flag {
  393. number = float64(group_list[v.Group_Id].Group_Size*group_Threshold_outlier) / float64(100)
  394. } else {
  395. number = float64(group_list[v.Group_Id].Group_Size*group_list[v.Group_Id].Group_Active_Threshold) / float64(100)
  396. }
  397. if number >= float64(online_msg_in_each_group_in_each_round_middle[v.Group_Id]) {
  398. group_list[v.Group_Id].Group_active = false
  399. group_list[v.Group_Id].Group_sleep_times++
  400. } else {
  401. group_list[v.Group_Id].Group_active = true
  402. group_list[v.Group_Id].Group_sleep_times = 0
  403. }
  404. }
  405. }
  406. db13_db14_db12_waitForReceive_flag = true
  407. close(db14_agentqueue_key)
  408. close(db14_agentqueue_val)
  409. close(db13_learning_key)
  410. close(db13_learning_val)
  411. close(db12_online_key)
  412. close(db12_online_val)
  413. db13_rpush_wg.Wait()
  414. db14_lpush_wg.Wait()
  415. db12_lpush_wg.Wait()
  416. log.Println("peocessConn ends,len(NameList)=", len(NameList), "Blen(Batch_List)", len(Batch_List))
  417. }
  418. //same eith in v1
  419. func cleanDBToReceiver(httpConn net.Conn) {
  420. defer cleanDBToReceiver_wg.Done()
  421. time.Sleep(time.Millisecond * 300)
  422. c, _ := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379))
  423. c.Do("select", 14)
  424. for {
  425. x, ok := <-sendDBToReceiver
  426. if ok {
  427. sendDBToReceiver_lock.Lock()
  428. queueLen, _ := c.Do("llen", x)
  429. ql, err_ql := redis.Int(queueLen, nil)
  430. if err_ql != nil {
  431. log.Println("db14 err_ql=", err_ql)
  432. }
  433. for i := 0; i < ql; i++ {
  434. result, err_rpop := c.Do("rpop", x)
  435. if err_rpop != nil {
  436. log.Println("db14 err_rpop=", err_rpop)
  437. }
  438. res, err_res := redis.String(result, nil)
  439. if err_res != nil {
  440. log.Println("db14 err_res=", err_res)
  441. }
  442. jsons, err_jsons := json.Marshal(res)
  443. if err_jsons != nil {
  444. log.Println("db14 err_jsons=", err_jsons)
  445. }
  446. httpConn.Write(jsons)
  447. }
  448. sendDBToReceiver_lock.Unlock()
  449. } else {
  450. return
  451. }
  452. }
  453. }
  454. var learningphase_start_round int
  455. var round int
  456. var last_Batch_List []string
  457. var last_Batch_List_to_online []string
  458. var lastLearningPhase_end_flag bool
  459. //same with v1
  460. func arrivalToLearn1() bool {
  461. log.Println(",arrivalToLearn1() starts")
  462. time.Sleep(time.Millisecond * 300)
  463. if len(Batch_List) >= Batch_list_threshold {
  464. if !lastLearningPhase_end_flag {
  465. httpConn_client, err1 := net.Dial("tcp", "192.168.32.144:20002")
  466. if err1 != nil {
  467. log.Println(" conn err dial 192.168.32.144:20002 failed", err1)
  468. }
  469. sendToCli := []byte("noChange")
  470. httpConn_client.Write(sendToCli)
  471. return false
  472. }
  473. learningphase_start_round = round
  474. last_Batch_List = make([]string, len(Batch_List))
  475. copy(last_Batch_List, Batch_List)
  476. httpConn_receiver, err2 := net.Dial("tcp", "192.168.32.144:20001")
  477. if err2 != nil {
  478. log.Println("conn err,dial 192.168.32.144:20001 failed", err2)
  479. }
  480. sendDBToReceiver = make(chan string, channelSize)
  481. for i := 0; i < 100; i++ {
  482. cleanDBToReceiver_wg.Add(1)
  483. go cleanDBToReceiver(httpConn_receiver)
  484. }
  485. middle_sum_number := 0
  486. for _, v := range arrival_phase_start_list {
  487. middle_sum_number = middle_sum_number + round - v
  488. }
  489. arrival_phase_number_list_lock.Lock()
  490. average_arrival_length = append(average_arrival_length, middle_sum_number/len(arrival_phase_start_list))
  491. arrival_phase_number_list = append(arrival_phase_number_list, len(arrival_phase_start_list))
  492. arrival_phase_number_list_lock.Unlock()
  493. arrival_phase_start_list = make([]int, 0)
  494. httpConn_client, err1 := net.Dial("tcp", "192.168.32.144:20002")
  495. if err1 != nil {
  496. log.Println("conn err dial 192.168.32.144:20002 failed", err1)
  497. }
  498. sendToClient := ""
  499. for i, v := range Batch_List {
  500. sendDBToReceiver <- v
  501. Status.Store(v, 1)
  502. sendToClient = sendToClient + v + ","
  503. if (i%6000 == 0 && i != 0) || i == len(Batch_List)-1 {
  504. sendToCli := []byte(sendToClient)
  505. httpConn_client.Write(sendToCli)
  506. sendToClient = ""
  507. }
  508. }
  509. log.Println("len(Batch_List)", len(Batch_List))
  510. close(sendDBToReceiver)
  511. time.Sleep(time.Second * 2)
  512. httpConn_client.Write([]byte("changeEnd"))
  513. Batch_List = make([]string, 0)
  514. cleanDBToReceiver_wg.Wait()
  515. c, c_err := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379))
  516. if c_err != nil {
  517. log.Println("conn err,c_err=", c_err)
  518. }
  519. c.Do("select", 14)
  520. c.Do("flushdb")
  521. log.Println("len(Batch_List)=", len(Batch_List))
  522. lastLearningPhase_end_flag = false
  523. return true
  524. } else {
  525. httpConn_client, err1 := net.Dial("tcp", "192.168.32.144:20002")
  526. if err1 != nil {
  527. log.Println("conn err dial 192.168.32.144:20002 failed", err1)
  528. }
  529. sendToCli := []byte("noChange")
  530. httpConn_client.Write(sendToCli)
  531. return false
  532. }
  533. }
  534. //same with v1
  535. func refresh_learningPhase_flag() bool {
  536. defer log.Println("refresh_learningPhase_flag ends")
  537. log.Println("refresh_learningPhase_flag starts")
  538. if round-learningphase_start_round == 24 {
  539. log.Println("round=", round, "learningphase_start_round=", learningphase_start_round)
  540. lastLearningPhase_end_flag = true
  541. if round >= 33 {
  542. lastLearningPhase_end_flag = false
  543. return false
  544. }
  545. last_Batch_List_to_online = make([]string, len(last_Batch_List))
  546. copy(last_Batch_List_to_online, last_Batch_List)
  547. return true
  548. }
  549. return false
  550. }
  551. //same with v1
  552. func learning_To_online_batchList(flag bool) {
  553. log.Println("learning_To_online starts,flag=", flag)
  554. time.Sleep(time.Second * 1)
  555. if !flag {
  556. httpConn_client, err1 := net.Dial("tcp", "192.168.32.144:20002")
  557. if err1 != nil {
  558. log.Println("conn err dial 192.168.32.144:20002 failed", err1)
  559. }
  560. sendToCli := []byte("noChange")
  561. httpConn_client.Write(sendToCli)
  562. return
  563. } else {
  564. httpConn_client, err1 := net.Dial("tcp", "192.168.32.144:20002")
  565. if err1 != nil {
  566. log.Println("conn err dial 192.168.32.144:20002 failed", err1)
  567. }
  568. sendToClient := ""
  569. for i, v := range last_Batch_List_to_online {
  570. Status.Store(v, 2)
  571. sendToClient = sendToClient + v + ","
  572. if (i%6000 == 0 && i != 0) || i == len(last_Batch_List_to_online)-1 {
  573. sendToCli := []byte(sendToClient)
  574. httpConn_client.Write(sendToCli)
  575. sendToClient = ""
  576. }
  577. }
  578. time.Sleep(time.Second * 1)
  579. httpConn_client.Write([]byte("changeEnd"))
  580. }
  581. }
  582. //same with v1
  583. func learning_function(flag bool) {
  584. defer log.Println("learning_function ends")
  585. log.Println("learning_function starts")
  586. if flag {
  587. log.Println("round=", round, "learningphase_start_round=", learningphase_start_round)
  588. //1,执行output.sh;
  589. _, err := exec.Command("/bin/bash", "/home/it/middle_data/output.sh").CombinedOutput()
  590. if err != nil {
  591. log.Println("err,cmd.Output:", err)
  592. return
  593. }
  594. log.Println("output.sh ends")
  595. starttime := time.Now()
  596. //final.py
  597. cmd, err_cmd := exec.Command("python3", "/home/it/learning-phase/final.py").CombinedOutput()
  598. if err_cmd != nil {
  599. log.Println("cmd.Output err,err_cmd=", err_cmd)
  600. return
  601. }
  602. log.Println("python result", string(cmd))
  603. cost := int(time.Since(starttime) / time.Second)
  604. duration = cost
  605. //2.5 combine.sh
  606. _, err_combine := exec.Command("/bin/bash", "/home/it/middle_data/combine.sh").CombinedOutput()
  607. if err_combine != nil {
  608. log.Println("combine err,cmd.Output:", err_combine)
  609. return
  610. }
  611. log.Println("combine.sh ends")
  612. filePath1 := "/home/it/middle_data/filtered_good_name_list.txt"
  613. content1, err1 := ioutil.ReadFile(filePath1)
  614. if err1 != nil {
  615. panic(err1)
  616. }
  617. m1 := strings.Split(string(content1), "\n")
  618. filePath2 := "/home/it/middle_data/groupingResult.txt"
  619. content2, err2 := ioutil.ReadFile(filePath2)
  620. if err2 != nil {
  621. panic(err1)
  622. }
  623. m2 := strings.Split(string(content2), "\n")
  624. log.Println("m1 length=", len(m1), "m2 length=", len(m2))
  625. for i := 0; i < k_val; i++ {
  626. var g Group
  627. g.Group_Id = i + k_val*groupSet_Number
  628. g.Group_Active_Threshold = group_Threshold_global
  629. g.Group_Size = 0
  630. g.Group_Size_copy = 0
  631. g.Group_active = false
  632. g.Group_User_List = make([]string, 0)
  633. g.Group_start_round = round
  634. g.Group_online_sum_length = 0
  635. g.Group_delay_duration_sum = time.Duration(0)
  636. g.Group_send_times = 0
  637. g.Group_sleep_times = 0
  638. g.Group_cover = 0
  639. if i == k_val-1 {
  640. g.Group_low_frequence = true
  641. } else {
  642. g.Group_low_frequence = false
  643. }
  644. //outlier
  645. if i == k_val-2 {
  646. g.Group_outlier_flag = true
  647. } else {
  648. g.Group_outlier_flag = false
  649. }
  650. online_msg_in_each_group_in_each_round[g.Group_Id] = make([]int, 0)
  651. group_size_in_each_round[g.Group_Id] = make([]int, 0)
  652. group_list = append(group_list, &g)
  653. number_group_hashmap[i+k_val*groupSet_Number] = &g
  654. }
  655. for i := 0; i < len(m1)-1; i++ {
  656. if i < len(m2)-2 {
  657. intm2, err_m2 := strconv.Atoi(m2[i])
  658. if err_m2 != nil {
  659. log.Println("err_m2=", err_m2)
  660. return
  661. }
  662. user_group_hashmap[m1[i]] = intm2 + groupSet_Number*k_val
  663. middle_g, ok_middle_g := number_group_hashmap[intm2+groupSet_Number*k_val]
  664. if !ok_middle_g {
  665. log.Println(" err group id=", intm2+groupSet_Number*k_val)
  666. } else {
  667. middle_g.Group_Size++
  668. middle_g.Group_User_List = append(middle_g.Group_User_List, m1[i])
  669. }
  670. } else {
  671. user_group_hashmap[m1[i]] = (groupSet_Number+1)*k_val - 1
  672. middle_g, ok_middle_g := number_group_hashmap[(groupSet_Number+1)*k_val-1]
  673. if !ok_middle_g {
  674. log.Println("err group id=", (groupSet_Number+1)*k_val-1)
  675. return
  676. } else {
  677. middle_g.Group_Size++
  678. middle_g.Group_User_List = append(middle_g.Group_User_List, m1[i])
  679. }
  680. }
  681. }
  682. sum := 0
  683. for i := 0; i < k_val; i++ {
  684. gg := number_group_hashmap[i+k_val*groupSet_Number]
  685. log.Println("id=", gg.Group_Id, "i+ k_val*groupSet_Number=", i+k_val*groupSet_Number, "的group,其size=", gg.Group_Size)
  686. sum = sum + gg.Group_Size
  687. }
  688. log.Println("groupSet_Number=", groupSet_Number)
  689. groupSet_Number++
  690. log.Println("groupSet_Number=", groupSet_Number)
  691. return
  692. } else {
  693. return
  694. }
  695. }
  696. var group_active_function_db12_lock sync.Mutex
  697. var group_active_function_db12_rpop_lock sync.Mutex
  698. var eliminated_user_channel chan string
  699. var active_user_channel chan string
  700. var k_val int
  701. var group_Threshold_global int
  702. var group_Threshold_outlier int
  703. var delay_limitation int
  704. var outlier_delay_limitation int
  705. var Group_sleep_times_threshold int
  706. //same with v1
  707. func group_active_function(gg *Group, list_1 []string, list_2 []string) {
  708. defer log.Println("group_id=", gg.Group_Id, "group_active_function ends,Group_Size=", gg.Group_Size)
  709. log.Println("group_id=", gg.Group_Id, "group group_active_function starts Group_Size=", gg.Group_Size)
  710. c_active, err_active := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379))
  711. if err_active != nil {
  712. log.Println("group_active_function conn err,err_active=", err_active)
  713. return
  714. }
  715. gg.Group_online_sum_length = gg.Group_online_sum_length + len(list_2)*(round-gg.Group_start_round)
  716. gg.Group_Size_copy = gg.Group_Size_copy + len(list_2)
  717. for _, v := range list_2 {
  718. x, load_ok := Status.Load(v)
  719. if load_ok {
  720. if x != 2 {
  721. err_result = append(err_result, v)
  722. }
  723. }
  724. Status.Delete(v)
  725. eliminated_user_channel <- v
  726. c_active.Do("select", 12)
  727. group_active_function_db12_lock.Lock()
  728. _, err_del := c_active.Do("del", v)
  729. group_active_function_db12_lock.Unlock()
  730. if err_del != nil {
  731. log.Println("err_del=", err_del)
  732. return
  733. }
  734. }
  735. gg.Group_Size = len(list_1)
  736. gg.Group_User_List = list_1
  737. c_active.Do("select", 12)
  738. for _, v := range list_1 {
  739. x, load_ok := Status.Load(v)
  740. if load_ok {
  741. if x != 2 {
  742. err_result = append(err_result, v)
  743. }
  744. }
  745. group_active_function_db12_rpop_lock.Lock()
  746. result, err_rpop := c_active.Do("rpop", v)
  747. group_active_function_db12_rpop_lock.Unlock()
  748. if err_rpop != nil {
  749. log.Println("err_rpop=", err_rpop)
  750. }
  751. res, err_string := redis.String(result, nil)
  752. if err_string != nil {
  753. log.Println("err_string=", err_string)
  754. }
  755. active_user_channel <- res
  756. mid := []byte(res)
  757. length := 0
  758. for i := len(res) - 1; ; i-- {
  759. if i < 0 {
  760. log.Println("err,res=", res, "len(res)=", len(res), "len(mid)=", len(mid), "mid=", mid)
  761. }
  762. if mid[i] == 125 { //125代表}
  763. length = i
  764. break
  765. }
  766. }
  767. mid = mid[:length+1]
  768. var stru map[string]interface{}
  769. err_unmarshal := json.Unmarshal(mid, &stru)
  770. if err_unmarshal != nil {
  771. log.Println("err_unmarshal=", err_unmarshal, "len(mid=)", len(mid))
  772. }
  773. cov, err_cover := redis.String(stru["Cover_Msg"], nil)
  774. if err_cover != nil {
  775. log.Println("err_cover", err_cover)
  776. break
  777. }
  778. if cov == "yes" {
  779. continue
  780. }
  781. gg.Group_send_times = gg.Group_send_times + 1
  782. times, err_time := redis.Strings(stru["Megred_Timestamp"], nil)
  783. if err_time != nil {
  784. log.Println("err_time", err_time, "a=", res)
  785. }
  786. msg_time := times[0]
  787. loc, err_loc := time.LoadLocation("Local")
  788. if err_loc != nil {
  789. fmt.Println("err_loc=", err_loc)
  790. }
  791. the_time_of_msg, err_the_time_msg := time.ParseInLocation("20060102150405", msg_time, loc)
  792. if err_the_time_msg != nil {
  793. fmt.Println("err_the_time_msg=", err_the_time_msg)
  794. }
  795. the_time_of_agent, err_the_time_agent := time.ParseInLocation("20060102150405", "20121101120000", loc)
  796. if err_the_time_agent != nil {
  797. fmt.Println("err_the_time_agent=", err_the_time_agent)
  798. }
  799. send_out_timepoint := the_time_of_agent.Add(time.Duration(round+1) * time.Hour)
  800. duration := send_out_timepoint.Sub(the_time_of_msg)
  801. gg.Group_delay_duration_sum = gg.Group_delay_duration_sum + duration
  802. }
  803. }
  804. var each_group_process_lock sync.Mutex
  805. var each_group_process_wg sync.WaitGroup
  806. //same with it in v1
  807. func each_group_process(g *Group) {
  808. defer each_group_process_wg.Done()
  809. c_db12, err_db12 := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379))
  810. if err_db12 != nil {
  811. log.Println("err_db12=", err_db12)
  812. return
  813. }
  814. c_db12.Do("select", 12)
  815. middle_active_list := make([]string, 0)
  816. middle_inactive_list := make([]string, 0)
  817. if g.Group_Size == 1 {
  818. middle_inactive_list = append(middle_inactive_list, g.Group_User_List[0])
  819. } else {
  820. for _, v := range g.Group_User_List {
  821. each_group_process_lock.Lock()
  822. agent_queue_length, err_agent_queue_length := c_db12.Do("llen", v)
  823. each_group_process_lock.Unlock()
  824. if err_agent_queue_length != nil {
  825. log.Println("err_agent_queue_length=", err_agent_queue_length)
  826. return
  827. }
  828. length, err_length := redis.Int(agent_queue_length, nil)
  829. if err_length != nil {
  830. log.Println("err_length=", err_length)
  831. return
  832. }
  833. if g.Group_outlier_flag {
  834. if length == 0 || length >= delay_limitation {
  835. middle_inactive_list = append(middle_inactive_list, v)
  836. } else {
  837. middle_active_list = append(middle_active_list, v)
  838. }
  839. } else {
  840. if length >= 1 && length <= delay_limitation {
  841. middle_active_list = append(middle_active_list, v)
  842. } else {
  843. middle_inactive_list = append(middle_inactive_list, v)
  844. }
  845. }
  846. }
  847. }
  848. number := float64(0)
  849. //outlier
  850. if g.Group_outlier_flag == true {
  851. number = float64(g.Group_Size*g.Group_Active_Threshold) / float64(100)
  852. }
  853. if g.Group_active {
  854. log.Println("group_id=", g.Group_Id, "group is online, threshold number=", number, "active user number=", len(middle_active_list))
  855. group_active_function(g, middle_active_list, middle_inactive_list)
  856. } else {
  857. log.Println("group_id=", g.Group_Id, "group is offline threshold number=", number, "active user number", len(middle_active_list))
  858. }
  859. log.Println("group_id=", g.Group_Id, "group is", g.Group_active, ",active user number=", len(middle_active_list), "inactive user number=", len(middle_inactive_list), "group_size=", g.Group_Size, "Group_User_List长度=", len(g.Group_User_List))
  860. }
  861. var db12_send_to_receiver_wg sync.WaitGroup
  862. var active_user_channel_closed bool
  863. //same with v1
  864. func db12_send_to_receiver(c net.Conn) {
  865. defer db12_send_to_receiver_wg.Done()
  866. for {
  867. x, ok := <-active_user_channel
  868. if ok {
  869. jsons, err_jsons := json.Marshal(x)
  870. if err_jsons != nil {
  871. log.Println("err_jsons=", err_jsons)
  872. }
  873. c.Write(jsons)
  874. } else {
  875. if active_user_channel_closed {
  876. return
  877. } else {
  878. continue
  879. }
  880. }
  881. }
  882. }
  883. var eliminated_user_channel_closed bool
  884. var online_eliminated_batchList_wg sync.WaitGroup
  885. //same with v1
  886. func online_eliminated_batchList() {
  887. defer online_eliminated_batchList_wg.Done()
  888. time.Sleep(time.Second * 2)
  889. httpConn_client, err1 := net.Dial("tcp", "192.168.32.144:20002")
  890. if err1 != nil {
  891. log.Println("conn err dial 192.168.32.144:20002 failed", err1)
  892. }
  893. number := 0
  894. eliminated_list := make([]string, 0)
  895. for {
  896. v, ok := <-eliminated_user_channel
  897. if ok {
  898. number++
  899. eliminated_list = append(eliminated_list, v)
  900. } else {
  901. if eliminated_user_channel_closed {
  902. break
  903. } else {
  904. continue
  905. }
  906. }
  907. }
  908. if len(eliminated_list) > 0 {
  909. sendToClient := ""
  910. for i, v := range eliminated_list {
  911. sendToClient = sendToClient + v + ","
  912. if (i%6000 == 0 && i != 0) || i == len(eliminated_list)-1 {
  913. sendToCli := []byte(sendToClient)
  914. httpConn_client.Write(sendToCli)
  915. sendToClient = ""
  916. }
  917. }
  918. time.Sleep(time.Second * 2)
  919. httpConn_client.Write([]byte("changeEnd"))
  920. } else {
  921. httpConn_client.Write([]byte("noChange"))
  922. }
  923. log.Println("len(eliminated_list)", len(eliminated_list), "number=", number)
  924. }
  925. //same with v1
  926. func online_cache_process() {
  927. c, c_err := net.Dial("tcp", "192.168.32.144:20001")
  928. if c_err != nil {
  929. log.Println("online msg db12 conn err dial 192.168.32.144:20001 failed", c_err)
  930. }
  931. active_user_channel_closed = false
  932. eliminated_user_channel_closed = false
  933. for i := 0; i < 10; i++ {
  934. db12_send_to_receiver_wg.Add(1)
  935. go db12_send_to_receiver(c)
  936. }
  937. online_eliminated_batchList_wg.Add(1)
  938. go online_eliminated_batchList()
  939. log.Println("group_list length", len(group_list))
  940. for _, v := range group_list {
  941. each_group_process_wg.Add(1)
  942. each_group_process(v)
  943. }
  944. each_group_process_wg.Wait()
  945. close(active_user_channel)
  946. active_user_channel_closed = true
  947. close(eliminated_user_channel)
  948. eliminated_user_channel_closed = true
  949. db12_send_to_receiver_wg.Wait()
  950. online_eliminated_batchList_wg.Wait()
  951. }
  952. func record_group_size() {
  953. for _, v := range group_list {
  954. group_size_in_each_round[v.Group_Id] = append(group_size_in_each_round[v.Group_Id], v.Group_Size)
  955. }
  956. }
  957. func initMetric() {
  958. Batch_list_threshold = 10000
  959. channelSize = Batch_list_threshold + 10000
  960. round = 0
  961. lastLearningPhase_end_flag = true
  962. packetLength = 3000
  963. k_val = 15
  964. group_Threshold_global = 30
  965. group_Threshold_outlier = 30
  966. delay_limitation = 2
  967. outlier_delay_limitation = 2
  968. Group_sleep_times_threshold = 3
  969. }
  970. func initNameListAndBatchList() {
  971. Batch_List = make([]string, 0)
  972. NameList = make([]string, 0)
  973. number_group_hashmap = make(map[int]*Group)
  974. groupSet_Number = 0
  975. group_list = make([]*Group, 0)
  976. online_msg_in_each_group_in_each_round = make(map[int][]int)
  977. user_group_hashmap = make(map[string]int)
  978. group_size_in_each_round = make(map[int][]int)
  979. err_result = make([]string, 0)
  980. arrival_phase_start_list = make([]int, 0)
  981. average_arrival_length = make([]int, 0)
  982. arrival_phase_number_list = make([]int, 0)
  983. }
  984. func generateLineItems(result []int) []opts.LineData {
  985. items := make([]opts.LineData, 0)
  986. for i := 0; i < len(result); i++ {
  987. items = append(items, opts.LineData{Value: result[i]})
  988. }
  989. return items
  990. }
  991. var err_result []string
  992. var duration int
  993. func main() {
  994. //log
  995. logFile, err := os.OpenFile("log.txt", os.O_CREATE|os.O_APPEND|os.O_RDWR, 0666)
  996. if err != nil {
  997. panic(err)
  998. }
  999. defer logFile.Close()
  1000. mw := io.MultiWriter(os.Stdout, logFile)
  1001. log.SetOutput(mw)
  1002. connID = 0
  1003. var connID int
  1004. listener, err := net.Listen("tcp", "192.168.32.144:20000")
  1005. if err != nil {
  1006. log.Println("start tcp server 192.168.32.144:20000 failed", err)
  1007. return
  1008. }
  1009. initNameListAndBatchList()
  1010. initMetric()
  1011. initDb()
  1012. for i := 0; i < 35; i++ {
  1013. log.Println("--------------------------------------------------the ", i, "th round starts----------------------------------------------")
  1014. eliminated_user_channel = make(chan string, 100000)
  1015. active_user_channel = make(chan string, 100000)
  1016. online_msg_number = 0
  1017. online_msg_number_empty = 0
  1018. round = i
  1019. cov_rec = 0
  1020. real_rec = 0
  1021. conn, err := listener.Accept()
  1022. if err != nil {
  1023. log.Println("accept failed", err)
  1024. return
  1025. }
  1026. connID = connID + 1
  1027. log.Println("connID", connID)
  1028. peocessConn(conn, connID)
  1029. h24 := refresh_learningPhase_flag()
  1030. arrivalToLearn1()
  1031. learning_function(h24)
  1032. learning_To_online_batchList(h24)
  1033. online_msg_in_each_group_in_each_round_middle = make([]int, k_val*(groupSet_Number))
  1034. online_cache_process()
  1035. log.Println("online_msg_number=", online_msg_number, "online_msg_number_empty=", online_msg_number_empty)
  1036. record_group_size()
  1037. log.Println("len(NameList)", len(NameList))
  1038. log.Println("cov_rec,real_rec are", cov_rec, real_rec)
  1039. log.Println("---------------------------------------------------next round---------------------------------------------------")
  1040. }
  1041. ave_groupsize := make([]int, len(group_size_in_each_round[0]))
  1042. for i, v := range group_list {
  1043. log.Println("group_id=", i, "group,group_size is", group_size_in_each_round[v.Group_Id])
  1044. if !v.Group_low_frequence {
  1045. for i, v := range group_size_in_each_round[v.Group_Id] {
  1046. if v == 0 {
  1047. ave_groupsize[i] = ave_groupsize[i] + 1
  1048. } else {
  1049. ave_groupsize[i] = ave_groupsize[i] + v
  1050. }
  1051. }
  1052. }
  1053. log.Println("group_id=", i, "group,online user in each round is", online_msg_in_each_group_in_each_round[v.Group_Id])
  1054. }
  1055. log.Println(err_result)
  1056. log.Println("k_val=", k_val, "duration=", duration)
  1057. log.Println("arrival phase arrival length and users in each batch", arrival_phase_number_list, average_arrival_length)
  1058. sum_number := 0
  1059. sum_length := 0
  1060. sum_duration := time.Duration(0)
  1061. sum_send_times := 0
  1062. sum_online_cover := 0
  1063. for _, v := range group_list {
  1064. if !v.Group_low_frequence {
  1065. log.Println(v.Group_Id, "Group_size_copy=", v.Group_Size_copy, "online sum length", v.Group_online_sum_length)
  1066. sum_length = v.Group_online_sum_length + sum_length
  1067. sum_number = sum_number + v.Group_Size_copy
  1068. sum_duration = sum_duration + v.Group_delay_duration_sum
  1069. sum_send_times = sum_send_times + v.Group_send_times
  1070. sum_online_cover = sum_online_cover + v.Group_cover
  1071. }
  1072. }
  1073. log.Println("average online length=", float64(sum_length)/float64(sum_number), "sum_number", sum_number, "average latency", sum_duration/time.Duration(sum_send_times), "total_cover=,", total_cover, "average cover=", float64(total_cover)/float64(sum_number))
  1074. ave_groupsize_double := make([]float64, len(ave_groupsize))
  1075. for i, v := range ave_groupsize {
  1076. ave_groupsize_double[i] = math.Round((float64(v) / 14.0))
  1077. }
  1078. log.Println("average group size=", ave_groupsize_double)
  1079. }