main.go 24 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "io/ioutil"
  7. "log"
  8. "net"
  9. "os"
  10. "strconv"
  11. "strings"
  12. "sync"
  13. "time"
  14. "github.com/gomodule/redigo/redis"
  15. )
  16. var channelSize int
  17. var Status map[string]int
  18. var status_learning []string
  19. var NameList []string
  20. var IpList map[string]string
  21. var wantToSendHash map[string]int
  22. var packetLength int
  23. func GetConn() redis.Conn {
  24. conn, err := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379))
  25. if err != nil {
  26. fmt.Println("connect redis error redis err", err)
  27. return nil
  28. }
  29. return conn
  30. }
  31. func interfaceToString(x interface{}) string {
  32. xx, err1 := redis.Strings(x, nil)
  33. if err1 != nil {
  34. fmt.Println("interfaceToString err", err1)
  35. fmt.Println("xx=", xx, "xx[0]=", xx[0])
  36. }
  37. xxx := xx[0]
  38. return xxx
  39. }
  40. var times int
  41. func setIp() string {
  42. str := strconv.Itoa(times)
  43. times = times + 1
  44. return str
  45. }
  46. var rejoin_number int
  47. func getIp(uid string) string {
  48. if IpList[uid] != "" {
  49. if Status[uid] != 3 {
  50. return IpList[uid]
  51. } else {
  52. rejoin_number++
  53. Status[uid] = 0
  54. x := setIp()
  55. IpList[uid] = x
  56. db1_key_channel <- uid
  57. db1_val_channel <- x
  58. return x
  59. }
  60. } else {
  61. NameList = append(NameList, uid)
  62. Status[uid] = 0
  63. x := setIp()
  64. IpList[uid] = x
  65. db1_key_channel <- uid
  66. db1_val_channel <- x
  67. return x
  68. }
  69. }
  70. //db2
  71. func pushIntoDB2(uid, time, msg string) {
  72. k1 := uid + "_msg"
  73. k2 := uid + "_time"
  74. db2_msg_key_channel <- k1
  75. db2_time_key_channel <- k2
  76. db2_msg_val_channel <- msg
  77. db2_time_val_channel <- time
  78. }
  79. var User_id_queue string
  80. var Msg_queue string
  81. var Timestamp_queue string
  82. var User_Ip string
  83. var db1_key_channel chan string
  84. var db1_val_channel chan string
  85. var mutex_db1 sync.Mutex
  86. //db1
  87. func db1_Set_Goroutine() {
  88. defer from_db0_input_wg.Done()
  89. conn := GetConn()
  90. conn.Do("select", 1)
  91. for {
  92. mutex_db1.Lock()
  93. x, ok1 := <-db1_key_channel
  94. y, ok2 := <-db1_val_channel
  95. mutex_db1.Unlock()
  96. if !ok1 || !ok2 {
  97. return
  98. }
  99. conn.Do("set", x, y)
  100. }
  101. }
  102. var db2_time_key_channel chan string
  103. var db2_time_val_channel chan string
  104. var mutex_db2_time sync.Mutex
  105. //db2
  106. func db2_Time_Set_Goroutine() {
  107. defer from_db0_input_wg.Done()
  108. conn := GetConn()
  109. conn.Do("select", 2)
  110. for {
  111. mutex_db2_time.Lock()
  112. x, ok1 := <-db2_time_key_channel
  113. y, ok2 := <-db2_time_val_channel
  114. mutex_db2_time.Unlock()
  115. if !ok1 || !ok2 {
  116. return
  117. }
  118. conn.Do("lpush", x, y)
  119. }
  120. }
  121. var db2_msg_key_channel chan string
  122. var db2_msg_val_channel chan string
  123. var mutex_db2_msg sync.Mutex
  124. func db2_Msg_Set_Goroutine() {
  125. defer from_db0_input_wg.Done()
  126. conn := GetConn()
  127. conn.Do("select", 2)
  128. for {
  129. mutex_db2_msg.Lock()
  130. x, ok1 := <-db2_msg_key_channel
  131. y, ok2 := <-db2_msg_val_channel
  132. mutex_db2_msg.Unlock()
  133. if !ok1 || !ok2 {
  134. return
  135. }
  136. conn.Do("lpush", x, y)
  137. }
  138. }
  139. func test2(finalTime string, lastTimesIndex int) (finalIndex int) {
  140. fmt.Println("text2() starts")
  141. db1_key_channel = make(chan string, channelSize)
  142. db1_val_channel = make(chan string, channelSize)
  143. db2_time_key_channel = make(chan string, channelSize)
  144. db2_time_val_channel = make(chan string, channelSize)
  145. db2_msg_key_channel = make(chan string, channelSize)
  146. db2_msg_val_channel = make(chan string, channelSize)
  147. wantToSendHash = make(map[string]int)
  148. conn := GetConn()
  149. for i := lastTimesIndex; ; i++ {
  150. time1, err1 := conn.Do("hmget", i, "Timestamp")
  151. if err1 != nil {
  152. fmt.Println("Timestamp hmget,err1=", err1)
  153. }
  154. Timestamp_queue = interfaceToString(time1)
  155. if Timestamp_queue > finalTime {
  156. finalIndex = i
  157. fmt.Println("timeout,i", i)
  158. break
  159. } else {
  160. u_id, err := conn.Do("hmget", i, "User_id")
  161. if err != nil {
  162. fmt.Println("u_id hmget err1=", err)
  163. }
  164. User_id_queue = interfaceToString(u_id)
  165. wantToSendHash[User_id_queue] = 1
  166. messages, err2 := conn.Do("hmget", i, "Hashtags")
  167. if err2 != nil {
  168. fmt.Println("message hmget,err2=", err2)
  169. }
  170. Msg_queue = interfaceToString(messages)
  171. User_Ip = getIp(User_id_queue)
  172. pushIntoDB2(User_id_queue, Timestamp_queue, Msg_queue)
  173. }
  174. }
  175. for i := 0; i < 130; i++ {
  176. from_db0_input_wg.Add(3)
  177. go db1_Set_Goroutine()
  178. go db2_Time_Set_Goroutine()
  179. go db2_Msg_Set_Goroutine()
  180. }
  181. close(db1_key_channel)
  182. close(db1_val_channel)
  183. close(db2_time_key_channel)
  184. close(db2_time_val_channel)
  185. close(db2_msg_key_channel)
  186. close(db2_msg_val_channel)
  187. from_db0_input_wg.Wait()
  188. fmt.Println("text2() ends")
  189. return
  190. }
  191. var real_msg_channel chan string
  192. var cover_msg_channel chan string
  193. type megred struct {
  194. Megred_Timestamp []string
  195. Megred_Hashtags []string
  196. Megred_User_Id string
  197. Megred_Ip string
  198. Cover_Msg string
  199. }
  200. var from_db0_input_wg sync.WaitGroup
  201. //used for sendOut_real
  202. func putWantToSendHashIntoChannel(real_msg_channel chan<- string) {
  203. fmt.Println("wantToSendHash length", len(wantToSendHash))
  204. for i := range wantToSendHash {
  205. real_msg_channel <- i
  206. }
  207. }
  208. //used for sendOut_cov
  209. func putStatus_learningIntoChannel(cover_msg_channel chan<- string) {
  210. fmt.Println("status_learning length", len(status_learning))
  211. for _, v := range status_learning {
  212. cover_msg_channel <- v
  213. }
  214. }
  215. func getHttpConnection() net.Conn {
  216. httpConn, err := net.Dial("tcp", "192.168.32.144:20000")
  217. if err != nil {
  218. fmt.Println("conn err dial 127.0.0.1:20000 failed", err)
  219. return nil
  220. }
  221. return httpConn
  222. }
  223. //same with v1
  224. func merge(v string, conn redis.Conn) *megred {
  225. conn.Do("select", "2")
  226. a := v + "_time"
  227. b := v + "_msg"
  228. l1, err_L1 := conn.Do("LLEN", a)
  229. if err_L1 != nil {
  230. fmt.Println("err_L1=", err_L1)
  231. }
  232. length1, err_length1 := redis.Int(l1, nil)
  233. if err_length1 != nil {
  234. fmt.Println("err_length1=", err_length1)
  235. }
  236. l2, err_L2 := conn.Do("LLEN", b)
  237. if err_L2 != nil {
  238. fmt.Println("err_L2=", err_L2)
  239. }
  240. length2, err_length2 := redis.Int(l2, nil)
  241. if err_length2 != nil {
  242. fmt.Println("err_length2=", err_length2)
  243. }
  244. //time
  245. time_middle, _ := conn.Do("lrange", a, 0, length1-1)
  246. x, err_x := redis.Strings(time_middle, nil)
  247. if err_x != nil {
  248. fmt.Println("err_x=", err_x)
  249. }
  250. aa := x
  251. //msg
  252. msg_middle, _ := conn.Do("lrange", b, 0, length2-1)
  253. y, errr_y := redis.Strings(msg_middle, nil)
  254. if errr_y != nil {
  255. fmt.Println("errr_y=", errr_y)
  256. }
  257. bb := y
  258. //clean two queue
  259. for i := 0; i < length1; i++ {
  260. conn.Do("rpop", a)
  261. }
  262. for i := 0; i < length2; i++ {
  263. conn.Do("rpop", b)
  264. }
  265. ip_middle := IpList[v]
  266. merged_result := &megred{
  267. aa,
  268. bb,
  269. v,
  270. ip_middle,
  271. "no",
  272. }
  273. return merged_result
  274. }
  275. var cov_lock sync.Mutex
  276. var covNumber int
  277. var real_lock sync.Mutex
  278. var realNumber int
  279. var arrival_msg_number int
  280. var learning_msg_number int
  281. var online_msg_number int
  282. var learning_msg_number_wg sync.Mutex
  283. var online_msg_number_wg sync.Mutex
  284. var test_list []string
  285. type users struct {
  286. Uid string
  287. Chance int
  288. Queue [][]byte
  289. Group *group
  290. Used_cover int
  291. Used_delay int
  292. Pointer int
  293. }
  294. type group struct {
  295. Group_id int
  296. Users_list []*users
  297. Pattern []int
  298. }
  299. //used in pattern()
  300. var k_val int
  301. var batch int
  302. var count_cover_delay_lock sync.Mutex
  303. var total_used_cover int
  304. var total_used_delay int
  305. var int_to_group map[int]*group
  306. var grouplist []*group
  307. var online_users_hash map[string]*users
  308. var online_users_hash_lock sync.Mutex
  309. var users_channel chan *users
  310. var users_channel_wg sync.WaitGroup
  311. var eliminated_users_list []string
  312. //get fixed scheduler for each anonymity set
  313. func pattern() {
  314. filePath1 := "/home/it/middle_data/filtered_good_name_list.txt"
  315. content1, err1 := ioutil.ReadFile(filePath1)
  316. if err1 != nil {
  317. fmt.Println("err1", err1)
  318. panic(err1)
  319. }
  320. m1 := strings.Split(string(content1), "\n")
  321. filePath2 := "/home/it/middle_data/groupingResult.txt"
  322. content2, err2 := ioutil.ReadFile(filePath2)
  323. if err2 != nil {
  324. fmt.Println("err2", err2)
  325. panic(err2)
  326. }
  327. m2 := strings.Split(string(content2), "\n")
  328. filePath3 := "/home/it/middle_data/pattern.txt"
  329. content3, err3 := ioutil.ReadFile(filePath3)
  330. if err3 != nil {
  331. fmt.Println("err3", err3)
  332. panic(err3)
  333. }
  334. m3 := strings.Split(string(content3), "\n")
  335. //creat k_val pattern list
  336. pattern := make([][]int, k_val-2)
  337. for i := 0; i < k_val-2; i++ {
  338. pattern[i] = make([]int, 0)
  339. }
  340. for i := 0; i < len(m3)-1; i++ {
  341. middle := strings.Split(m3[i], " ")
  342. for j := 0; j < len(middle); j++ {
  343. mm, _ := strconv.Atoi(middle[j])
  344. pattern[i] = append(pattern[i], mm)
  345. }
  346. }
  347. for i := 0; i < k_val-1; i++ {
  348. var g group
  349. g.Group_id = i + batch*k_val
  350. if i < k_val-2 {
  351. g.Pattern = pattern[i]
  352. }
  353. g.Users_list = make([]*users, 0)
  354. int_to_group[i+batch*k_val] = &g
  355. grouplist = append(grouplist, &g)
  356. }
  357. //creat structure for each user
  358. for i := 0; i < len(m2)-1; i++ {
  359. var u users
  360. u.Uid = m1[i]
  361. u.Chance = 20 // init chance
  362. u.Queue = make([][]byte, 0)
  363. middle, _ := strconv.Atoi(m2[i])
  364. u.Group = int_to_group[middle]
  365. u.Used_cover = 0
  366. u.Used_delay = 0
  367. u.Pointer = 0
  368. int_to_group[middle].Users_list = append(int_to_group[middle].Users_list, &u)
  369. online_users_hash_lock.Lock()
  370. online_users_hash[u.Uid] = &u
  371. online_users_hash_lock.Unlock()
  372. }
  373. for _, v := range int_to_group {
  374. fmt.Println(v.Pattern, len(v.Users_list))
  375. }
  376. users_numbers := 0
  377. special_numbers := 0
  378. for i, v := range online_users_hash {
  379. if i[0] > '9' || i[0] < '0' || i == "" {
  380. log.Println("err", i, v)
  381. special_numbers++
  382. } else {
  383. users_numbers++
  384. }
  385. }
  386. log.Println()
  387. log.Println("users_numbers", users_numbers, "special_numbers", special_numbers)
  388. }
  389. //cache messages from client queue into scheduler queue
  390. func send_online(c net.Conn) {
  391. defer log.Println("send_online ends")
  392. users_channel = make(chan *users, 100000)
  393. for _, v := range online_users_hash {
  394. if v == nil {
  395. continue
  396. }
  397. if v.Group.Group_id%k_val != k_val-2 {
  398. users_channel <- v
  399. }
  400. }
  401. close(users_channel)
  402. for i := 0; i < 1000; i++ {
  403. users_channel_wg.Add(1)
  404. go send_each_user(c)
  405. }
  406. users_channel_wg.Wait()
  407. }
  408. //follow the fixed scheduler, use it to reshape users' traffic
  409. func send_each_user(c net.Conn) {
  410. defer users_channel_wg.Done()
  411. for {
  412. u, ok := <-users_channel
  413. if !ok {
  414. return
  415. } else {
  416. if len(u.Group.Pattern) == 0 {
  417. log.Println("err,u=", u)
  418. }
  419. step := u.Group.Pattern[u.Pointer] //pointer points to next bit
  420. u.Pointer = u.Pointer + 1
  421. if step == 1 { //should send in this round
  422. //pop a message from scheduler queue and send it out
  423. if len(u.Queue) > 0 {
  424. x := u.Queue[0]
  425. u.Queue = u.Queue[1:]
  426. c.Write(x)
  427. continue
  428. } else {
  429. //can still use cover message
  430. if u.Chance > 0 {
  431. cover := &megred{
  432. []string{},
  433. []string{},
  434. "",
  435. "cover",
  436. "yes",
  437. }
  438. cov_mid := *cover
  439. cov_mid.Megred_User_Id = u.Uid
  440. cov_mid.Cover_Msg = "yes"
  441. cov_mid.Megred_Hashtags = []string{"[cover]"}
  442. cov_mid.Megred_Ip = IpList[u.Uid]
  443. cov_mid.Megred_Timestamp = []string{"000000000"}
  444. jsons, err_json := json.Marshal(cov_mid)
  445. if err_json != nil {
  446. fmt.Println("err_json=", err_json)
  447. }
  448. total_length := packetLength
  449. padding_length := total_length - len(jsons)
  450. pd := make([]byte, padding_length)
  451. jsons = append(jsons, pd...)
  452. if len(jsons) != packetLength {
  453. fmt.Println("err,cover=", jsons)
  454. }
  455. c.Write(jsons)
  456. u.Chance = u.Chance - 1
  457. u.Used_cover++
  458. continue
  459. } else {
  460. //queue is empty and has no chance,then do nothing
  461. continue
  462. }
  463. }
  464. } else {
  465. //step==0,should not send message in this round
  466. if len(u.Queue) > 0 { //queue is not empty, use a delay
  467. if u.Chance > 0 { //has chance, can use delay
  468. u.Chance = u.Chance - 1
  469. u.Used_delay++
  470. continue
  471. } else { //has no chance,cannot use delay
  472. x := u.Queue[0]
  473. u.Queue = u.Queue[1:]
  474. c.Write(x)
  475. continue
  476. }
  477. } else { //queue is empty,do nothing
  478. continue
  479. }
  480. }
  481. }
  482. }
  483. }
  484. //for real message, process them according to user's status
  485. //similar with v1,the only difference is that don's send real message to ACS,but firstly cache them into scheduler queue
  486. func sendOut_real(list <-chan string, c net.Conn, cover *megred) {
  487. defer finalSend_wg_real.Done()
  488. time.Sleep(time.Millisecond * 100)
  489. conn := GetConn()
  490. conn.Do("select", "2")
  491. for {
  492. v, ok := <-list
  493. if !ok {
  494. return
  495. }
  496. status_v := Status[v]
  497. switch status_v {
  498. case 0:
  499. arrival_msg_number++
  500. case 1:
  501. learning_msg_number_wg.Lock()
  502. learning_msg_number++
  503. learning_msg_number_wg.Unlock()
  504. case 2:
  505. online_msg_number_wg.Lock()
  506. online_msg_number++
  507. test_list = append(test_list, v)
  508. online_msg_number_wg.Unlock()
  509. }
  510. rst := merge(v, conn)
  511. jsons, err_json := json.Marshal(*rst)
  512. if err_json != nil {
  513. fmt.Println("err_json=", err_json)
  514. }
  515. total_length := packetLength
  516. padding_length := total_length - len(jsons)
  517. if jsons[len(jsons)-1] != 125 {
  518. fmt.Println("err,rst=", rst)
  519. break
  520. }
  521. if padding_length <= 0 {
  522. fmt.Println("too long,use cover instead of this real message")
  523. cov_mid := *cover
  524. cov_mid.Megred_User_Id = v
  525. cov_mid.Cover_Msg = "no"
  526. cov_mid.Megred_Hashtags = []string{"[other]"}
  527. cov_mid.Megred_Ip = IpList[v]
  528. cov_mid.Megred_Timestamp = []string{"000000000"}
  529. jsons, err_json := json.Marshal(cov_mid)
  530. if err_json != nil {
  531. fmt.Println("err_json=", err_json)
  532. }
  533. total_length := packetLength
  534. padding_length := total_length - len(jsons)
  535. pd := make([]byte, padding_length)
  536. jsons = append(jsons, pd...)
  537. if len(jsons) != packetLength {
  538. fmt.Println("err,cover=", jsons)
  539. }
  540. if status_v != 2 || online_users_hash[v] == nil {
  541. c.Write(jsons)
  542. real_lock.Lock()
  543. realNumber++
  544. real_lock.Unlock()
  545. continue
  546. }
  547. //online
  548. if status_v == 2 { //online
  549. if online_users_hash[v].Group.Group_id%k_val != k_val-2 {
  550. online_users_hash_lock.Lock()
  551. middle_user := online_users_hash[v]
  552. online_users_hash_lock.Unlock()
  553. middle_user.Queue = append(middle_user.Queue, jsons)
  554. continue
  555. } else { //outliers
  556. c.Write(jsons)
  557. real_lock.Lock()
  558. realNumber++
  559. real_lock.Unlock()
  560. continue
  561. }
  562. }
  563. continue
  564. }
  565. pd := make([]byte, padding_length)
  566. jsons = append(jsons, pd...)
  567. if len(jsons) != packetLength {
  568. fmt.Println("err,rst=", rst)
  569. }
  570. //not online, or outliers, directly send out
  571. if status_v != 2 || online_users_hash[v] == nil {
  572. c.Write(jsons)
  573. real_lock.Lock()
  574. realNumber++
  575. real_lock.Unlock()
  576. continue
  577. }
  578. //online
  579. if status_v == 2 { //online,not low frequence;normal or ontliers
  580. if online_users_hash[v].Group.Group_id%k_val != k_val-2 {
  581. online_users_hash_lock.Lock()
  582. middle_user := online_users_hash[v]
  583. online_users_hash_lock.Unlock()
  584. middle_user.Queue = append(middle_user.Queue, jsons)
  585. continue
  586. } else { //outliers
  587. c.Write(jsons)
  588. real_lock.Lock()
  589. realNumber++
  590. real_lock.Unlock()
  591. continue
  592. }
  593. }
  594. }
  595. }
  596. //
  597. func sendOut_cover(list <-chan string, c net.Conn, cover *megred) {
  598. defer finalSend_wg_cov.Done()
  599. time.Sleep(time.Millisecond * 100)
  600. for {
  601. v, ok := <-list
  602. if !ok {
  603. break
  604. } else {
  605. if wantToSendHash[v] == 1 {
  606. continue
  607. } else {
  608. status_v := Status[v]
  609. switch status_v {
  610. case 1:
  611. learning_msg_number_wg.Lock()
  612. learning_msg_number++
  613. learning_msg_number_wg.Unlock()
  614. }
  615. cov_mid := *cover
  616. cov_mid.Megred_User_Id = v
  617. cov_mid.Cover_Msg = "yes"
  618. cov_mid.Megred_Hashtags = []string{"[cover]"}
  619. cov_mid.Megred_Ip = IpList[v]
  620. cov_mid.Megred_Timestamp = []string{"000000000"}
  621. jsons, err_json := json.Marshal(cov_mid)
  622. if err_json != nil {
  623. fmt.Println("err_json=", err_json)
  624. }
  625. total_length := packetLength
  626. padding_length := total_length - len(jsons)
  627. pd := make([]byte, padding_length)
  628. jsons = append(jsons, pd...)
  629. if len(jsons) != packetLength {
  630. fmt.Println("出错了,长度不对,cover=", jsons)
  631. }
  632. c.Write(jsons)
  633. cov_lock.Lock()
  634. covNumber++
  635. cov_lock.Unlock()
  636. }
  637. }
  638. }
  639. }
  640. //same with v1
  641. func roundEnd(c net.Conn) {
  642. x := []byte("roundEnd")
  643. total_length := packetLength
  644. padding_length := total_length - len(x)
  645. pd := make([]byte, padding_length)
  646. x = append(x, pd...)
  647. if len(x) != packetLength {
  648. fmt.Println("err,x=", x)
  649. }
  650. c.Write(x)
  651. }
  652. var finalSend_wg_cov sync.WaitGroup
  653. var finalSend_wg_real sync.WaitGroup
  654. //send messages from scheduler queue to agent/ACS
  655. func finalSend(c net.Conn) {
  656. cover := &megred{
  657. []string{},
  658. []string{},
  659. "",
  660. "cover",
  661. "yes",
  662. }
  663. real_msg_channel = make(chan string)
  664. cover_msg_channel = make(chan string)
  665. for i := 0; i < 1000; i++ {
  666. finalSend_wg_real.Add(1)
  667. finalSend_wg_cov.Add(1)
  668. go sendOut_real(real_msg_channel, c, cover)
  669. go sendOut_cover(cover_msg_channel, c, cover)
  670. }
  671. putWantToSendHashIntoChannel(real_msg_channel)
  672. putStatus_learningIntoChannel(cover_msg_channel)
  673. close(real_msg_channel)
  674. close(cover_msg_channel)
  675. finalSend_wg_real.Wait()
  676. finalSend_wg_cov.Wait()
  677. time.Sleep(time.Millisecond * 100)
  678. send_online(c)
  679. roundEnd(c)
  680. }
  681. func initDB1_DB2() {
  682. conn := GetConn()
  683. conn.Do("select", "1")
  684. conn.Do("flushdb")
  685. conn.Do("keys *")
  686. conn.Do("select", "2")
  687. conn.Do("flushdb")
  688. conn.Do("keys *")
  689. }
  690. func start() {
  691. initDB1_DB2()
  692. times = 0
  693. IpList = make(map[string]string)
  694. Status = make(map[string]int)
  695. NameList = make([]string, 0)
  696. status_learning = make([]string, 0)
  697. packetLength = 3000
  698. k_val = 15
  699. batch = 0
  700. int_to_group = make(map[int]*group)
  701. grouplist = make([]*group, 0)
  702. total_used_cover = 0
  703. total_used_delay = 0
  704. online_users_hash = make(map[string]*users)
  705. eliminated_users_list = make([]string, 0)
  706. }
  707. func timeAddOneHour(input string) (outpute string) {
  708. loc, err_loc := time.LoadLocation("Local")
  709. if err_loc != nil {
  710. fmt.Println("err_loc=", err_loc)
  711. }
  712. the_time, err_the_time := time.ParseInLocation("20060102150405", input, loc)
  713. if err_the_time != nil {
  714. fmt.Println("err_the_time=", err_the_time)
  715. }
  716. h, _ := time.ParseDuration("1h")
  717. h1 := the_time.Add(1 * h)
  718. //h, _ := time.ParseDuration("1m")
  719. //h1 := the_time.Add(30 * h)
  720. //h, _ := time.ParseDuration("1h")
  721. //h1 := the_time.Add(2 * h)
  722. timeString := h1.Format("20060102150405")
  723. outpute = timeString
  724. return
  725. }
  726. var learning_phase_start_round int
  727. var round int
  728. //same with v1
  729. func StatusChangeOrNot_arrival_to_learning() {
  730. defer log.Println("StatusChangeOrNot_arrival_to_learning() ends")
  731. fmt.Println("StatusChangeOrNot_arrival_to_learning() starts")
  732. listener, err := net.Listen("tcp", "192.168.32.144:20002")
  733. if err != nil {
  734. fmt.Println("StatusChangeOrNot_arrival_to_learning(),as server,listen 192.168.32.144:20002 failed ", err)
  735. return
  736. }
  737. conn, err := listener.Accept()
  738. if err != nil {
  739. fmt.Println("accept failed,StatusChangeOrNot_arrival_to_learning err", err)
  740. return
  741. }
  742. var tmp [10000]byte
  743. res := ""
  744. for {
  745. n, err := conn.Read(tmp[:])
  746. if err != nil {
  747. fmt.Println("read from conn failed", err)
  748. break
  749. }
  750. if string(tmp[:8]) == "noChange" {
  751. log.Println("StatusChangeOrNot_arrival_to_learning ends,no change")
  752. listener.Close()
  753. return
  754. } else {
  755. result := string(tmp[:n])
  756. res = res + result
  757. if string(tmp[:9]) == "changeEnd" {
  758. listener.Close()
  759. break
  760. }
  761. }
  762. }
  763. res = strings.Replace(res, " ", "", -1)
  764. res = strings.Replace(res, "\n", "", -1)
  765. str_arr := strings.Split(res, ",")
  766. learning_phase_start_round = round
  767. for _, str := range str_arr {
  768. Status[str] = 1
  769. status_learning = append(status_learning, str)
  770. }
  771. fmt.Println("StatusChangeOrNot_arrival_to_learningends, someone has changed=", len(status_learning))
  772. }
  773. //same with v1
  774. func StatusChangeOrNot_eliminated() {
  775. defer fmt.Println("StatusChangeOrNot_eliminated ends")
  776. log.Println("StatusChangeOrNot_eliminated() starts")
  777. listener, err := net.Listen("tcp", "192.168.32.144:20002")
  778. if err != nil {
  779. fmt.Println("StatusChangeOrNot_eliminated(), as server,listen 192.168.32.144:20002 failed", err)
  780. return
  781. }
  782. conn, err := listener.Accept()
  783. if err != nil {
  784. fmt.Println("accept failed,StatusChangeOrNot_eliminated err", err)
  785. return
  786. }
  787. var tmp [10000]byte
  788. res := ""
  789. for {
  790. n, err := conn.Read(tmp[:])
  791. if err != nil {
  792. fmt.Println("read from conn failed", err)
  793. break
  794. }
  795. if string(tmp[:8]) == "noChange" {
  796. fmt.Println("StatusChangeOrNot_eliminatedends, no change")
  797. listener.Close()
  798. return
  799. } else {
  800. result := string(tmp[:n])
  801. res = res + result
  802. if string(tmp[:9]) == "changeEnd" {
  803. listener.Close()
  804. break
  805. }
  806. }
  807. }
  808. res = strings.Replace(res, " ", "", -1)
  809. res = strings.Replace(res, "\n", "", -1)
  810. str_arr := strings.Split(res, ",")
  811. //eliminated users
  812. for _, str := range str_arr {
  813. Status[str] = 3
  814. online_users_hash_lock.Lock()
  815. middle_u := online_users_hash[str]
  816. online_users_hash_lock.Unlock()
  817. if middle_u == nil {
  818. continue
  819. }
  820. if middle_u.Group.Group_id%k_val == k_val-2 {
  821. continue
  822. }
  823. count_cover_delay_lock.Lock()
  824. total_used_cover = total_used_cover + middle_u.Used_cover
  825. total_used_delay = total_used_delay + middle_u.Used_delay
  826. count_cover_delay_lock.Unlock()
  827. middle_u = nil
  828. online_users_hash[str] = nil
  829. }
  830. log.Println("eliminated users are", str_arr)
  831. }
  832. //same with v1
  833. func refresh_status_learning() {
  834. if round-learning_phase_start_round == 24 {
  835. fmt.Println("len(status_learning)=", len(status_learning))
  836. status_learning = make([]string, 0)
  837. }
  838. }
  839. //same with v1
  840. func StatusChangeOrNot_learning_to_online() {
  841. defer log.Println("StatusChangeOrNot_learning_to_online() ends")
  842. log.Println("StatusChangeOrNot_learning_to_online() starts")
  843. listener, err := net.Listen("tcp", "192.168.32.144:20002")
  844. if err != nil {
  845. fmt.Println("StatusChangeOrNot_learning_to_online(),as server,listen 192.168.32.144:20002 failed", err)
  846. return
  847. }
  848. conn, err := listener.Accept()
  849. if err != nil {
  850. fmt.Println("accept failed", err)
  851. return
  852. }
  853. var tmp [10000]byte
  854. res := ""
  855. for {
  856. n, err := conn.Read(tmp[:])
  857. if err != nil {
  858. fmt.Println("read from conn failed", err)
  859. break
  860. }
  861. if string(tmp[:8]) == "noChange" {
  862. listener.Close()
  863. return
  864. } else {
  865. result := string(tmp[:n])
  866. res = res + result
  867. if string(tmp[:9]) == "changeEnd" {
  868. listener.Close()
  869. break
  870. }
  871. }
  872. }
  873. res = strings.Replace(res, " ", "", -1)
  874. res = strings.Replace(res, "\n", "", -1)
  875. str_arr := strings.Split(res, ",")
  876. //set status to 1
  877. number := 0
  878. for i := 0; i < len(str_arr)-1; i++ {
  879. Status[str_arr[i]] = 2
  880. number++
  881. }
  882. fmt.Println("changed status number=", number)
  883. pattern()
  884. batch = batch + 1
  885. }
  886. func main() {
  887. //lof
  888. logFile, err := os.OpenFile("log.txt", os.O_CREATE|os.O_APPEND|os.O_RDWR, 0666)
  889. if err != nil {
  890. panic(err)
  891. }
  892. defer logFile.Close()
  893. mw := io.MultiWriter(os.Stdout, logFile)
  894. log.SetOutput(mw)
  895. start()
  896. channelSize = 150000
  897. //1st test point:
  898. lastTimesIndex := 0
  899. roundEndTime := "20121101130000"
  900. //2ed test point:
  901. //lastTimesIndex := 1479975
  902. //roundEndTime := "20121110130000"
  903. for i := 0; i < 35; i++ {
  904. fmt.Println("------------------------------------------------------------the", i, "th round starts------------------------------------------------------------------")
  905. c := getHttpConnection()
  906. arrival_msg_number = 0
  907. learning_msg_number = 0
  908. online_msg_number = 0
  909. rejoin_number = 0
  910. round = i
  911. covNumber = 0
  912. realNumber = 0
  913. lastTimesIndex = test2(roundEndTime, lastTimesIndex)
  914. fmt.Println("lastTimesIndex=", lastTimesIndex)
  915. roundEndTime = timeAddOneHour(roundEndTime)
  916. fmt.Println("roundEndTime=", roundEndTime)
  917. fmt.Println("status length", len(Status), "NameList length", len(NameList), "status_learning length", len(status_learning))
  918. finalSend(c)
  919. refresh_status_learning()
  920. fmt.Println("Status length", len(Status))
  921. StatusChangeOrNot_arrival_to_learning()
  922. StatusChangeOrNot_learning_to_online()
  923. StatusChangeOrNot_eliminated()
  924. fmt.Println("covNumber=", covNumber)
  925. fmt.Println("realNumber=", realNumber)
  926. fmt.Println("arrival_msg_number=", arrival_msg_number, "earning_msg_number=", learning_msg_number, "online_msg_number=", online_msg_number)
  927. log.Println("rejoin_number", rejoin_number)
  928. fmt.Println("------------------------------------------------------------the", i, "th round ends------------------------------------------------------------------")
  929. }
  930. log.Println("total_used_cover,total_used_delay=", total_used_cover, total_used_delay)
  931. users_number := make([]int, 0)
  932. total_delay := 0
  933. total_cover := 0
  934. total_queue := 0
  935. total_user := 0
  936. for _, v := range grouplist {
  937. for _, m := range v.Users_list {
  938. total_delay = total_delay + m.Used_delay
  939. total_cover = total_cover + m.Used_cover
  940. total_queue = total_queue + len(m.Queue)
  941. }
  942. total_user = total_user + len(v.Users_list)
  943. users_number = append(users_number, len(v.Users_list))
  944. }
  945. log.Println("users_number", users_number)
  946. log.Println("average delay", float64(total_delay)/float64(total_user), " average cover=", float64(total_cover)/float64(total_user), "average queue=", float64(total_queue)/float64(total_user))
  947. }