main.go 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "log"
  8. "net"
  9. "os"
  10. "strconv"
  11. "strings"
  12. "sync"
  13. "time"
  14. "github.com/gomodule/redigo/redis"
  15. )
  16. var channelSize int
  17. var Status map[string]int
  18. var status_learning []string
  19. var NameList []string
  20. var IpList map[string]string
  21. var wantToSendHash map[string]int
  22. var packetLength int
  23. func GetConn() redis.Conn {
  24. conn, err := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379))
  25. if err != nil {
  26. fmt.Println("connect redis error", err)
  27. return nil
  28. }
  29. return conn
  30. }
  31. func interfaceToString(x interface{}) string {
  32. xx, err1 := redis.Strings(x, nil)
  33. if err1 != nil {
  34. fmt.Println("interfaceToString", err1)
  35. fmt.Println("xx=", xx, "xx[0]=", xx[0])
  36. }
  37. xxx := xx[0]
  38. return xxx
  39. }
  40. var times int
  41. func setIp() string {
  42. str := strconv.Itoa(times)
  43. times = times + 1
  44. return str
  45. }
  46. var rejoin_number int
  47. func getIp(uid string) string {
  48. if IpList[uid] != "" {
  49. if Status[uid] != 3 {
  50. return IpList[uid]
  51. } else {
  52. rejoin_number++
  53. Status[uid] = 0
  54. x := setIp()
  55. IpList[uid] = x
  56. db1_key_channel <- uid
  57. db1_val_channel <- x
  58. return x
  59. }
  60. } else {
  61. NameList = append(NameList, uid)
  62. Status[uid] = 0
  63. x := setIp()
  64. IpList[uid] = x
  65. db1_key_channel <- uid
  66. db1_val_channel <- x
  67. return x
  68. }
  69. }
  70. func pushIntoDB2(uid, time, msg string) {
  71. k1 := uid + "_msg"
  72. k2 := uid + "_time"
  73. db2_msg_key_channel <- k1
  74. db2_time_key_channel <- k2
  75. db2_msg_val_channel <- msg
  76. db2_time_val_channel <- time
  77. }
  78. var User_id_queue string
  79. var Msg_queue string
  80. var Timestamp_queue string
  81. var User_Ip string
  82. var db1_key_channel chan string
  83. var db1_val_channel chan string
  84. var mutex_db1 sync.Mutex
  85. func db1_Set_Goroutine() {
  86. defer from_db0_input_wg.Done()
  87. conn := GetConn()
  88. conn.Do("select", 1)
  89. for {
  90. mutex_db1.Lock()
  91. x, ok1 := <-db1_key_channel
  92. y, ok2 := <-db1_val_channel
  93. mutex_db1.Unlock()
  94. if !ok1 || !ok2 {
  95. return
  96. }
  97. conn.Do("set", x, y)
  98. }
  99. }
  100. var db2_time_key_channel chan string
  101. var db2_time_val_channel chan string
  102. var mutex_db2_time sync.Mutex
  103. func db2_Time_Set_Goroutine() {
  104. defer from_db0_input_wg.Done()
  105. conn := GetConn()
  106. conn.Do("select", 2)
  107. for {
  108. mutex_db2_time.Lock()
  109. x, ok1 := <-db2_time_key_channel
  110. y, ok2 := <-db2_time_val_channel
  111. mutex_db2_time.Unlock()
  112. if !ok1 || !ok2 {
  113. return
  114. }
  115. conn.Do("lpush", x, y)
  116. }
  117. }
  118. var db2_msg_key_channel chan string
  119. var db2_msg_val_channel chan string
  120. var mutex_db2_msg sync.Mutex
  121. func db2_Msg_Set_Goroutine() {
  122. defer from_db0_input_wg.Done()
  123. conn := GetConn()
  124. conn.Do("select", 2)
  125. for {
  126. mutex_db2_msg.Lock()
  127. x, ok1 := <-db2_msg_key_channel
  128. y, ok2 := <-db2_msg_val_channel
  129. mutex_db2_msg.Unlock()
  130. if !ok1 || !ok2 {
  131. return
  132. }
  133. conn.Do("lpush", x, y)
  134. }
  135. }
  136. // Exactly the same as in v1
  137. func test2(finalTime string, lastTimesIndex int) (finalIndex int) {
  138. fmt.Println("text2()")
  139. db1_key_channel = make(chan string, channelSize)
  140. db1_val_channel = make(chan string, channelSize)
  141. db2_time_key_channel = make(chan string, channelSize)
  142. db2_time_val_channel = make(chan string, channelSize)
  143. db2_msg_key_channel = make(chan string, channelSize)
  144. db2_msg_val_channel = make(chan string, channelSize)
  145. wantToSendHash = make(map[string]int)
  146. conn := GetConn()
  147. for i := lastTimesIndex; ; i++ {
  148. time1, err1 := conn.Do("hmget", i, "Timestamp")
  149. if err1 != nil {
  150. fmt.Println("Timestamp hmget err1=", err1)
  151. }
  152. Timestamp_queue = interfaceToString(time1)
  153. if Timestamp_queue > finalTime {
  154. finalIndex = i
  155. fmt.Println("i", i)
  156. break
  157. } else {
  158. u_id, err := conn.Do("hmget", i, "User_id")
  159. if err != nil {
  160. fmt.Println("u_id hmget ,err1=", err)
  161. }
  162. User_id_queue = interfaceToString(u_id)
  163. wantToSendHash[User_id_queue] = 1
  164. messages, err2 := conn.Do("hmget", i, "Hashtags")
  165. if err2 != nil {
  166. fmt.Println("message hmget ,err2=", err2)
  167. }
  168. Msg_queue = interfaceToString(messages)
  169. User_Ip = getIp(User_id_queue)
  170. pushIntoDB2(User_id_queue, Timestamp_queue, Msg_queue)
  171. }
  172. }
  173. for i := 0; i < 130; i++ {
  174. from_db0_input_wg.Add(3)
  175. go db1_Set_Goroutine()
  176. go db2_Time_Set_Goroutine()
  177. go db2_Msg_Set_Goroutine()
  178. }
  179. close(db1_key_channel)
  180. close(db1_val_channel)
  181. close(db2_time_key_channel)
  182. close(db2_time_val_channel)
  183. close(db2_msg_key_channel)
  184. close(db2_msg_val_channel)
  185. from_db0_input_wg.Wait()
  186. fmt.Println("text2() ends")
  187. return
  188. }
  189. var real_msg_channel chan string
  190. var cover_msg_channel chan string
  191. type megred struct {
  192. Megred_Timestamp []string
  193. Megred_Hashtags []string
  194. Megred_User_Id string
  195. Megred_Ip string
  196. Cover_Msg string
  197. }
  198. var from_db0_input_wg sync.WaitGroup
  199. //Exactly the same as in v1
  200. func putWantToSendHashIntoChannel(real_msg_channel chan<- string) {
  201. fmt.Println("wantToSendHash", len(wantToSendHash))
  202. for i := range wantToSendHash {
  203. real_msg_channel <- i
  204. }
  205. }
  206. //Exactly the same as in v1
  207. func putStatus_learningIntoChannel(cover_msg_channel chan<- string) {
  208. fmt.Println("status_learning", len(status_learning))
  209. for _, v := range status_learning {
  210. cover_msg_channel <- v
  211. }
  212. }
  213. //Exactly the same as in v1
  214. func getHttpConnection() net.Conn {
  215. httpConn, err := net.Dial("tcp", "192.168.32.144:20000")
  216. if err != nil {
  217. fmt.Println("conn errdial 192.168.32.144:20000 failed", err)
  218. return nil
  219. }
  220. return httpConn
  221. }
  222. ////Exactly the same as in v1
  223. func merge(v string, conn redis.Conn) *megred {
  224. conn.Do("select", "2")
  225. a := v + "_time"
  226. b := v + "_msg"
  227. l1, err_L1 := conn.Do("LLEN", a)
  228. if err_L1 != nil {
  229. fmt.Println("err_L1=", err_L1)
  230. }
  231. length1, err_length1 := redis.Int(l1, nil)
  232. if err_length1 != nil {
  233. fmt.Println("err_length1=", err_length1)
  234. }
  235. l2, err_L2 := conn.Do("LLEN", b)
  236. if err_L2 != nil {
  237. fmt.Println("err_L2=", err_L2)
  238. }
  239. length2, err_length2 := redis.Int(l2, nil)
  240. if err_length2 != nil {
  241. fmt.Println("err_length2=", err_length2)
  242. }
  243. time_middle, _ := conn.Do("lrange", a, 0, length1-1)
  244. x, err_x := redis.Strings(time_middle, nil)
  245. if err_x != nil {
  246. fmt.Println("err_x=", err_x)
  247. }
  248. aa := x
  249. msg_middle, _ := conn.Do("lrange", b, 0, length2-1)
  250. y, errr_y := redis.Strings(msg_middle, nil)
  251. if errr_y != nil {
  252. fmt.Println("errr_y=", errr_y)
  253. }
  254. bb := y
  255. for i := 0; i < length1; i++ {
  256. conn.Do("rpop", a)
  257. }
  258. for i := 0; i < length2; i++ {
  259. conn.Do("rpop", b)
  260. }
  261. ip_middle := IpList[v]
  262. merged_result := &megred{
  263. aa,
  264. bb,
  265. v,
  266. ip_middle,
  267. "no",
  268. }
  269. return merged_result
  270. }
  271. var cov_lock sync.Mutex
  272. var covNumber int
  273. var real_lock sync.Mutex
  274. var realNumber int
  275. var arrival_msg_number int
  276. var learning_msg_number int
  277. var online_msg_number int
  278. var learning_msg_number_wg sync.Mutex
  279. var online_msg_number_wg sync.Mutex
  280. var test_list []string
  281. //Unlike in V1, each user has the structure
  282. type users struct {
  283. Uid string
  284. Bonus int //only used for bonus-based V3
  285. Chance int
  286. Queue [][]byte
  287. Group *group
  288. Used_cover int
  289. Used_delay int
  290. }
  291. type group struct {
  292. Group_id int
  293. Users_list []*users
  294. }
  295. //about pattern()
  296. var k_val int
  297. var batch int
  298. var count_cover_delay_lock sync.Mutex
  299. var total_used_cover int
  300. var total_used_delay int
  301. var int_to_group map[int]*group
  302. var grouplist []*group
  303. var online_users_hash map[string]*users
  304. var online_users_hash_lock sync.Mutex
  305. var users_channel chan *users
  306. var users_channel_wg sync.WaitGroup
  307. var eliminated_users_list []string
  308. var allow_hash map[string]bool
  309. //Communicate with agent. The process used to determine the list of users who need to send messages, i.e. the non-fixed scheduler
  310. func before_finalsend() {
  311. c1, c_err := net.Dial("tcp", "192.168.32.144:20010") //agent address
  312. if c_err != nil {
  313. log.Println("err,,dial 192.168.32.144:20010 failed", c_err)
  314. }
  315. wantToSend_list := make([]string, 0)
  316. //1,send the active users list to agent
  317. for k := range online_users_hash {
  318. if wantToSendHash[k] == 1 {
  319. wantToSend_list = append(wantToSend_list, k)
  320. }
  321. }
  322. if len(wantToSend_list) > 0 {
  323. sendToAgent := ""
  324. for i, v := range wantToSend_list {
  325. sendToAgent = sendToAgent + v + ","
  326. if (i%6000 == 0 && i != 0) || i == len(wantToSend_list)-1 {
  327. sendToCli := []byte(sendToAgent)
  328. c1.Write(sendToCli)
  329. sendToAgent = ""
  330. }
  331. }
  332. time.Sleep(time.Second * 2)
  333. c1.Write([]byte("changeEnd"))
  334. } else {
  335. c1.Write([]byte("noChange"))
  336. }
  337. //2,get feedback from agent
  338. listener, err := net.Listen("tcp", "192.168.32.144:20012")
  339. if err != nil {
  340. fmt.Println("err 192.168.32.144:20012 failed", err)
  341. return
  342. }
  343. conn, err := listener.Accept()
  344. if err != nil {
  345. fmt.Println("conn err before()", err)
  346. return
  347. }
  348. var tmp [10000]byte
  349. res := ""
  350. for {
  351. n, err := conn.Read(tmp[:])
  352. if err != nil {
  353. fmt.Println("read from conn failed", err)
  354. break
  355. }
  356. //no feedback
  357. if string(tmp[:8]) == "noChange" {
  358. listener.Close()
  359. break
  360. } else {
  361. result := string(tmp[:n])
  362. res = res + result
  363. if string(tmp[:9]) == "changeEnd" {
  364. fmt.Println("receive feedback end")
  365. listener.Close()
  366. break
  367. }
  368. }
  369. }
  370. res = strings.Replace(res, " ", "", -1)
  371. res = strings.Replace(res, "\n", "", -1)
  372. str_arr := strings.Split(res, ",")
  373. for _, str := range str_arr {
  374. allow_hash[str] = true
  375. }
  376. log.Println("before ends", len(str_arr)-1)
  377. }
  378. //Communicate with agent. The process used to determine the list of users who need to send messages, i.e. the non-fixed scheduler
  379. func pattern() {
  380. log.Println("pattern")
  381. filePath1 := "/home/it/middle_data/filtered_good_name_list.txt"
  382. content1, err1 := ioutil.ReadFile(filePath1)
  383. if err1 != nil {
  384. fmt.Println("err1,", err1)
  385. panic(err1)
  386. }
  387. m1 := strings.Split(string(content1), "\n")
  388. filePath2 := "/home/it/middle_data/groupingResult.txt"
  389. content2, err2 := ioutil.ReadFile(filePath2)
  390. if err2 != nil {
  391. fmt.Println("err2", err2)
  392. panic(err2)
  393. }
  394. m2 := strings.Split(string(content2), "\n")
  395. for i := 0; i < k_val-1; i++ {
  396. var g group
  397. g.Group_id = i + batch*k_val
  398. g.Users_list = make([]*users, 0)
  399. int_to_group[i] = &g
  400. grouplist = append(grouplist, &g)
  401. }
  402. //creat structure for each user
  403. for i := 0; i < len(m2)-1; i++ {
  404. var u users
  405. u.Uid = m1[i]
  406. u.Chance = 10
  407. u.Bonus = 0
  408. u.Queue = make([][]byte, 0)
  409. middle, _ := strconv.Atoi(m2[i])
  410. u.Group = int_to_group[middle]
  411. u.Used_cover = 0
  412. u.Used_delay = 0
  413. int_to_group[middle].Users_list = append(int_to_group[middle].Users_list, &u)
  414. online_users_hash_lock.Lock()
  415. online_users_hash[u.Uid] = &u
  416. online_users_hash_lock.Unlock()
  417. }
  418. log.Println("pattern ends", len(m2)-1)
  419. }
  420. //Prepare for the sending of all users with scheduler
  421. func send_online(c net.Conn) {
  422. defer log.Println("send_online ends")
  423. users_channel = make(chan *users, 100000)
  424. log.Println("send_online")
  425. for _, v := range online_users_hash {
  426. if v == nil {
  427. continue
  428. }
  429. if v.Group.Group_id%k_val != k_val-2 {
  430. users_channel <- v
  431. }
  432. }
  433. close(users_channel)
  434. for i := 0; i < 1000; i++ {
  435. users_channel_wg.Add(1)
  436. go send_each_user(c)
  437. }
  438. users_channel_wg.Wait()
  439. }
  440. //send func for each user with non-fixed scheduer;
  441. //Determine the sending behavior of each user by the number of feedbacks and chances at the agent
  442. func send_each_user(c net.Conn) {
  443. defer users_channel_wg.Done()
  444. for {
  445. u, ok := <-users_channel
  446. if !ok {
  447. return
  448. } else {
  449. if allow_hash[u.Uid] {
  450. if len(u.Queue) != 0 {
  451. x := u.Queue[0]
  452. u.Queue = u.Queue[1:]
  453. c.Write(x)
  454. u.Bonus++
  455. if u.Bonus >= 10 {
  456. u.Bonus = 0
  457. if u.Chance < 3 { //bonus-based or chance-based
  458. u.Chance = u.Chance + 0
  459. }
  460. }
  461. continue
  462. } else {
  463. if u.Chance > 0 {
  464. cover := &megred{
  465. []string{},
  466. []string{},
  467. "",
  468. "cover",
  469. "yes",
  470. }
  471. cov_mid := *cover
  472. cov_mid.Megred_User_Id = u.Uid
  473. cov_mid.Cover_Msg = "yes"
  474. cov_mid.Megred_Hashtags = []string{"[cover]"}
  475. cov_mid.Megred_Ip = IpList[u.Uid]
  476. cov_mid.Megred_Timestamp = []string{"000000000"}
  477. jsons, err_json := json.Marshal(cov_mid)
  478. if err_json != nil {
  479. fmt.Println("err_json=", err_json)
  480. }
  481. total_length := packetLength
  482. padding_length := total_length - len(jsons)
  483. pd := make([]byte, padding_length)
  484. jsons = append(jsons, pd...)
  485. if len(jsons) != packetLength {
  486. fmt.Println("cover=", jsons)
  487. }
  488. c.Write(jsons)
  489. u.Chance = u.Chance - 1
  490. u.Used_cover++
  491. continue
  492. } else {
  493. continue
  494. }
  495. }
  496. } else {
  497. if len(u.Queue) > 0 {
  498. if u.Chance > 0 {
  499. u.Chance = u.Chance - 1
  500. u.Used_delay++
  501. continue
  502. } else {
  503. x := u.Queue[0]
  504. u.Queue = u.Queue[1:]
  505. c.Write(x)
  506. continue
  507. }
  508. } else {
  509. continue
  510. }
  511. }
  512. }
  513. }
  514. }
  515. //Similar to v1, but only the merged message is cached in the golden scheduler queue;
  516. //whether to send it or not is determined by the above function "send_each_user"
  517. func sendOut_real(list <-chan string, c net.Conn, cover *megred) {
  518. defer finalSend_wg_real.Done()
  519. time.Sleep(time.Millisecond * 100)
  520. conn := GetConn()
  521. conn.Do("select", "2")
  522. for {
  523. v, ok := <-list
  524. if !ok {
  525. return
  526. }
  527. status_v := Status[v]
  528. switch status_v {
  529. case 0:
  530. arrival_msg_number++
  531. case 1:
  532. learning_msg_number_wg.Lock()
  533. learning_msg_number++
  534. learning_msg_number_wg.Unlock()
  535. case 2:
  536. online_msg_number_wg.Lock()
  537. online_msg_number++
  538. test_list = append(test_list, v)
  539. online_msg_number_wg.Unlock()
  540. }
  541. rst := merge(v, conn)
  542. jsons, err_json := json.Marshal(*rst)
  543. if err_json != nil {
  544. fmt.Println("err_json=", err_json)
  545. }
  546. total_length := packetLength
  547. padding_length := total_length - len(jsons)
  548. if jsons[len(jsons)-1] != 125 {
  549. fmt.Println("rst=", rst)
  550. break
  551. }
  552. if padding_length <= 0 {
  553. fmt.Println("longer than packetLength,v=", v, "jsons length", len(jsons), "use cover instead of it")
  554. cov_mid := *cover
  555. cov_mid.Megred_User_Id = v
  556. cov_mid.Cover_Msg = "no"
  557. cov_mid.Megred_Hashtags = []string{"[other]"}
  558. cov_mid.Megred_Ip = IpList[v]
  559. cov_mid.Megred_Timestamp = []string{"000000000"}
  560. jsons, err_json := json.Marshal(cov_mid)
  561. if err_json != nil {
  562. fmt.Println("err_json=", err_json)
  563. }
  564. total_length := packetLength
  565. padding_length := total_length - len(jsons)
  566. pd := make([]byte, padding_length)
  567. jsons = append(jsons, pd...)
  568. if len(jsons) != packetLength {
  569. fmt.Println("cover=", jsons)
  570. }
  571. if status_v != 2 || online_users_hash[v] == nil {
  572. c.Write(jsons)
  573. real_lock.Lock()
  574. realNumber++
  575. real_lock.Unlock()
  576. continue
  577. }
  578. //online
  579. if status_v == 2 {
  580. if online_users_hash[v].Group.Group_id%k_val != k_val-2 {
  581. online_users_hash_lock.Lock()
  582. middle_user := online_users_hash[v]
  583. online_users_hash_lock.Unlock()
  584. middle_user.Queue = append(middle_user.Queue, jsons)
  585. continue
  586. } else { //outliers
  587. c.Write(jsons)
  588. real_lock.Lock()
  589. realNumber++
  590. real_lock.Unlock()
  591. continue
  592. }
  593. }
  594. continue
  595. }
  596. pd := make([]byte, padding_length)
  597. jsons = append(jsons, pd...)
  598. if len(jsons) != packetLength {
  599. fmt.Println("err,rst=", rst)
  600. }
  601. //low frequence
  602. if status_v != 2 || online_users_hash[v] == nil {
  603. c.Write(jsons)
  604. real_lock.Lock()
  605. realNumber++
  606. real_lock.Unlock()
  607. continue
  608. }
  609. //online
  610. if status_v == 2 { //normal users
  611. if online_users_hash[v].Group.Group_id%k_val != k_val-2 {
  612. online_users_hash_lock.Lock()
  613. middle_user := online_users_hash[v]
  614. online_users_hash_lock.Unlock()
  615. middle_user.Queue = append(middle_user.Queue, jsons)
  616. continue
  617. } else { //outliers
  618. c.Write(jsons)
  619. real_lock.Lock()
  620. realNumber++
  621. real_lock.Unlock()
  622. continue
  623. }
  624. }
  625. }
  626. }
  627. //Similar to v1
  628. func sendOut_cover(list <-chan string, c net.Conn, cover *megred) {
  629. defer finalSend_wg_cov.Done()
  630. time.Sleep(time.Millisecond * 100)
  631. for {
  632. v, ok := <-list
  633. if !ok {
  634. break
  635. } else {
  636. if wantToSendHash[v] == 1 {
  637. continue
  638. } else {
  639. status_v := Status[v]
  640. switch status_v {
  641. case 1:
  642. learning_msg_number_wg.Lock()
  643. learning_msg_number++
  644. learning_msg_number_wg.Unlock()
  645. }
  646. cov_mid := *cover
  647. cov_mid.Megred_User_Id = v
  648. cov_mid.Cover_Msg = "yes"
  649. cov_mid.Megred_Hashtags = []string{"[cover]"}
  650. cov_mid.Megred_Ip = IpList[v]
  651. cov_mid.Megred_Timestamp = []string{"000000000"}
  652. jsons, err_json := json.Marshal(cov_mid)
  653. if err_json != nil {
  654. fmt.Println("err_json=", err_json)
  655. }
  656. total_length := packetLength
  657. padding_length := total_length - len(jsons)
  658. pd := make([]byte, padding_length)
  659. jsons = append(jsons, pd...)
  660. if len(jsons) != packetLength {
  661. fmt.Println("err,cover=", jsons)
  662. }
  663. c.Write(jsons)
  664. cov_lock.Lock()
  665. covNumber++
  666. cov_lock.Unlock()
  667. }
  668. }
  669. }
  670. }
  671. //same with v1
  672. func roundEnd(c net.Conn) {
  673. x := []byte("roundEnd")
  674. total_length := packetLength
  675. padding_length := total_length - len(x)
  676. pd := make([]byte, padding_length)
  677. x = append(x, pd...)
  678. if len(x) != packetLength {
  679. fmt.Println("err,x=", x)
  680. }
  681. c.Write(x)
  682. }
  683. var finalSend_wg_cov sync.WaitGroup
  684. var finalSend_wg_real sync.WaitGroup
  685. //similar with v1, but from scheduler queue
  686. func finalSend(c net.Conn) {
  687. //创建cover msg
  688. cover := &megred{
  689. []string{},
  690. []string{},
  691. "",
  692. "cover",
  693. "yes",
  694. }
  695. real_msg_channel = make(chan string)
  696. cover_msg_channel = make(chan string)
  697. for i := 0; i < 1000; i++ {
  698. finalSend_wg_real.Add(1)
  699. finalSend_wg_cov.Add(1)
  700. go sendOut_real(real_msg_channel, c, cover)
  701. go sendOut_cover(cover_msg_channel, c, cover)
  702. }
  703. putWantToSendHashIntoChannel(real_msg_channel)
  704. putStatus_learningIntoChannel(cover_msg_channel)
  705. close(real_msg_channel)
  706. close(cover_msg_channel)
  707. finalSend_wg_real.Wait()
  708. finalSend_wg_cov.Wait()
  709. time.Sleep(time.Millisecond * 100)
  710. send_online(c)
  711. roundEnd(c)
  712. }
  713. func initDB1_DB2() {
  714. conn := GetConn()
  715. conn.Do("select", "1")
  716. conn.Do("flushdb")
  717. conn.Do("keys *")
  718. conn.Do("select", "2")
  719. conn.Do("flushdb")
  720. conn.Do("keys *")
  721. }
  722. func start() {
  723. initDB1_DB2()
  724. times = 0
  725. IpList = make(map[string]string)
  726. Status = make(map[string]int)
  727. NameList = make([]string, 0)
  728. status_learning = make([]string, 0)
  729. packetLength = 3000
  730. k_val = 15
  731. batch = 0
  732. int_to_group = make(map[int]*group)
  733. grouplist = make([]*group, 0)
  734. total_used_cover = 0
  735. total_used_delay = 0
  736. online_users_hash = make(map[string]*users)
  737. eliminated_users_list = make([]string, 0)
  738. }
  739. //same with v1
  740. func timeAddOneHour(input string) (outpute string) {
  741. loc, err_loc := time.LoadLocation("Local")
  742. if err_loc != nil {
  743. fmt.Println("err_loc=", err_loc)
  744. }
  745. the_time, err_the_time := time.ParseInLocation("20060102150405", input, loc)
  746. if err_the_time != nil {
  747. fmt.Println("err_the_time=", err_the_time)
  748. }
  749. h, _ := time.ParseDuration("1h")
  750. h1 := the_time.Add(1 * h)
  751. //h, _ := time.ParseDuration("1m")
  752. //h1 := the_time.Add(30 * h)
  753. //h, _ := time.ParseDuration("1h")
  754. //h1 := the_time.Add(2 * h)
  755. timeString := h1.Format("20060102150405")
  756. outpute = timeString
  757. return
  758. }
  759. var learning_phase_start_round int
  760. var round int
  761. //same with v1
  762. func StatusChangeOrNot_arrival_to_learning() {
  763. defer log.Println("StatusChangeOrNot_arrival_to_learning() ends")
  764. fmt.Println("StatusChangeOrNot_arrival_to_learning() starts")
  765. listener, err := net.Listen("tcp", "192.168.32.144:20002")
  766. if err != nil {
  767. fmt.Println("listen 192.168.32.144:20002 failed", err)
  768. return
  769. }
  770. conn, err := listener.Accept()
  771. if err != nil {
  772. fmt.Println("conn err", err)
  773. return
  774. }
  775. var tmp [10000]byte
  776. res := ""
  777. for {
  778. n, err := conn.Read(tmp[:])
  779. if err != nil {
  780. fmt.Println("read from conn failed", err)
  781. break
  782. }
  783. if string(tmp[:8]) == "noChange" {
  784. log.Println("no changed status")
  785. listener.Close()
  786. return
  787. } else {
  788. result := string(tmp[:n])
  789. res = res + result
  790. if string(tmp[:9]) == "changeEnd" {
  791. listener.Close()
  792. break
  793. }
  794. }
  795. }
  796. res = strings.Replace(res, " ", "", -1)
  797. res = strings.Replace(res, "\n", "", -1)
  798. str_arr := strings.Split(res, ",")
  799. learning_phase_start_round = round
  800. for _, str := range str_arr {
  801. Status[str] = 1
  802. status_learning = append(status_learning, str)
  803. }
  804. fmt.Println("status_learning length=", len(status_learning))
  805. }
  806. //same with v1
  807. func StatusChangeOrNot_eliminated() {
  808. defer fmt.Println("StatusChangeOrNot_eliminated ends")
  809. log.Println("StatusChangeOrNot_eliminated() starts")
  810. listener, err := net.Listen("tcp", "192.168.32.144:20002")
  811. if err != nil {
  812. fmt.Println("StatusChangeOrNot_eliminated() listen 192.168.32.144:20002 failed", err)
  813. return
  814. }
  815. conn, err := listener.Accept()
  816. if err != nil {
  817. fmt.Println("conn err", err)
  818. return
  819. }
  820. var tmp [10000]byte
  821. res := ""
  822. for {
  823. n, err := conn.Read(tmp[:])
  824. if err != nil {
  825. fmt.Println("read from conn failed", err)
  826. break
  827. }
  828. if string(tmp[:8]) == "noChange" {
  829. fmt.Println("no changed status")
  830. listener.Close()
  831. return
  832. } else {
  833. result := string(tmp[:n])
  834. res = res + result
  835. if string(tmp[:9]) == "changeEnd" {
  836. listener.Close()
  837. break
  838. }
  839. }
  840. }
  841. res = strings.Replace(res, " ", "", -1)
  842. res = strings.Replace(res, "\n", "", -1)
  843. str_arr := strings.Split(res, ",")
  844. for _, str := range str_arr {
  845. Status[str] = 3
  846. online_users_hash_lock.Lock()
  847. middle_u := online_users_hash[str]
  848. online_users_hash_lock.Unlock()
  849. delete(online_users_hash, str)
  850. if middle_u == nil {
  851. continue
  852. }
  853. if middle_u.Group.Group_id%k_val == k_val-2 {
  854. continue
  855. }
  856. count_cover_delay_lock.Lock()
  857. total_used_cover = total_used_cover + middle_u.Used_cover
  858. total_used_delay = total_used_delay + middle_u.Used_delay
  859. count_cover_delay_lock.Unlock()
  860. }
  861. log.Println("eliminated users number", len(str_arr))
  862. }
  863. //same with v1
  864. func refresh_status_learning() {
  865. if round-learning_phase_start_round == 24 {
  866. fmt.Println("进入refresh_status_learning")
  867. fmt.Println("len(status_learning)=", len(status_learning))
  868. status_learning = make([]string, 0)
  869. }
  870. }
  871. //same with v1
  872. func StatusChangeOrNot_learning_to_online() {
  873. defer log.Println("StatusChangeOrNot_learning_to_online() ends")
  874. log.Println("StatusChangeOrNot_learning_to_online() starts")
  875. listener, err := net.Listen("tcp", "192.168.32.144:20002")
  876. if err != nil {
  877. fmt.Println("StatusChangeOrNot_learning_to_online() listen 192.168.32.144:20002 failed", err)
  878. return
  879. }
  880. conn, err := listener.Accept()
  881. if err != nil {
  882. fmt.Println("accept failed", err)
  883. return
  884. }
  885. var tmp [10000]byte
  886. res := ""
  887. for {
  888. n, err := conn.Read(tmp[:])
  889. if err != nil {
  890. fmt.Println("read from conn failed", err)
  891. break
  892. }
  893. if string(tmp[:8]) == "noChange" {
  894. log.Println("no changed status")
  895. listener.Close()
  896. return
  897. } else {
  898. result := string(tmp[:n])
  899. res = res + result
  900. if string(tmp[:9]) == "changeEnd" {
  901. listener.Close()
  902. break
  903. }
  904. }
  905. }
  906. res = strings.Replace(res, " ", "", -1)
  907. res = strings.Replace(res, "\n", "", -1)
  908. str_arr := strings.Split(res, ",")
  909. number := 0
  910. for i := 0; i < len(str_arr)-1; i++ {
  911. Status[str_arr[i]] = 2
  912. number++
  913. }
  914. fmt.Println("changed status numbers=", number)
  915. pattern()
  916. batch = batch + 1
  917. fmt.Println("someone has changed")
  918. }
  919. func double(value float64) float64 {
  920. value, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", value), 64)
  921. return value
  922. }
  923. func main() {
  924. //log
  925. logFile, err := os.OpenFile("log.txt", os.O_CREATE|os.O_APPEND|os.O_RDWR, 0666)
  926. if err != nil {
  927. panic(err)
  928. }
  929. defer logFile.Close()
  930. mw := io.MultiWriter(os.Stdout, logFile)
  931. log.SetOutput(mw)
  932. start()
  933. channelSize = 150000
  934. //1st test point
  935. lastTimesIndex := 0
  936. roundEndTime := "20121101130000"
  937. //2ed test point
  938. //lastTimesIndex := 1479975
  939. //roundEndTime := "20121110130000"
  940. //3rt test point
  941. //lastTimesIndex := 3751916
  942. //roundEndTime := "20121120130000"
  943. for i := 0; i < 35; i++ {
  944. log.Println("------------------------------------------------------------the", i, "th round starts------------------------------------------------------------------")
  945. c := getHttpConnection()
  946. allow_hash = make(map[string]bool)
  947. arrival_msg_number = 0
  948. learning_msg_number = 0
  949. online_msg_number = 0
  950. rejoin_number = 0
  951. round = i
  952. covNumber = 0
  953. realNumber = 0
  954. lastTimesIndex = test2(roundEndTime, lastTimesIndex)
  955. fmt.Println("lastTimesIndex=", lastTimesIndex)
  956. roundEndTime = timeAddOneHour(roundEndTime)
  957. fmt.Println("roundEndTime=", roundEndTime)
  958. fmt.Println("status length", len(Status), "NameList length", len(NameList), "status_learning length", len(status_learning))
  959. before_finalsend()
  960. finalSend(c)
  961. refresh_status_learning()
  962. fmt.Println("Status length", len(Status))
  963. StatusChangeOrNot_arrival_to_learning()
  964. StatusChangeOrNot_learning_to_online()
  965. StatusChangeOrNot_eliminated()
  966. fmt.Println("covNumber=", covNumber)
  967. fmt.Println("realNumber=", realNumber)
  968. fmt.Println("arrival_msg_number=", arrival_msg_number, "learning_msg_number=", learning_msg_number, "online_msg_number=", online_msg_number)
  969. log.Println("rejoin_number", rejoin_number)
  970. log.Println("------------------------------------------------------------the", i, "th round ends------------------------------------------------------------------")
  971. }
  972. log.Println("total_used_cover,total_used_delay=", total_used_cover, total_used_delay)
  973. used_delay_in_each_group := make([]int, 0)
  974. used_cover_in_each_group := make([]int, 0)
  975. queue_length := make([]int, 0)
  976. users_number := make([]int, 0)
  977. total_user_number := 0
  978. for _, v := range grouplist {
  979. a := 0
  980. b := 0
  981. c := 0
  982. for _, m := range v.Users_list {
  983. a = a + m.Used_delay
  984. b = b + m.Used_cover
  985. c = c + len(m.Queue)
  986. }
  987. used_delay_in_each_group = append(used_delay_in_each_group, a)
  988. used_cover_in_each_group = append(used_cover_in_each_group, b)
  989. queue_length = append(queue_length, c)
  990. users_number = append(users_number, len(v.Users_list))
  991. }
  992. log.Println("users_number", users_number)
  993. log.Println("used_delay_in_each_group", used_delay_in_each_group)
  994. log.Println("used_cover_in_each_group", used_cover_in_each_group)
  995. sum := 0
  996. for _, v := range used_cover_in_each_group {
  997. sum = sum + v
  998. }
  999. log.Println("cover sum=", sum)
  1000. log.Println("len(queue)", queue_length)
  1001. ave_delay := make([]float64, 0)
  1002. ave_cover := make([]float64, 0)
  1003. ave_queue_length := make([]float64, 0)
  1004. total_delay := float64(0)
  1005. total_cover := float64(0)
  1006. total_queue := float64(0)
  1007. for i := range grouplist {
  1008. total_delay = total_delay + float64(used_delay_in_each_group[i])
  1009. total_cover = total_cover + float64(used_cover_in_each_group[i])
  1010. total_queue = total_queue + float64(queue_length[i])
  1011. }
  1012. for i := range grouplist {
  1013. a := double(float64(used_delay_in_each_group[i]) / float64(users_number[i]))
  1014. b := double(float64(used_cover_in_each_group[i]) / float64(users_number[i]))
  1015. c := double(float64(queue_length[i]) / float64(users_number[i]))
  1016. ave_delay = append(ave_delay, a)
  1017. ave_cover = append(ave_cover, b)
  1018. ave_queue_length = append(ave_queue_length, c)
  1019. }
  1020. for i, v := range users_number {
  1021. if i != k_val-1 && i != k_val-2 {
  1022. total_user_number = total_user_number + v
  1023. }
  1024. }
  1025. aa := double(total_delay / float64(total_user_number))
  1026. bb := double(total_cover / float64(total_user_number))
  1027. cc := double(total_queue / float64(total_user_number))
  1028. log.Println("average delay,cover,queue are", aa, bb, cc, "eliminated users number", total_user_number)
  1029. }