main.go 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "io"
  6. "log"
  7. "net"
  8. "os"
  9. "strconv"
  10. "strings"
  11. "sync"
  12. "time"
  13. "github.com/gomodule/redigo/redis"
  14. )
  15. var channelSize int
  16. var Status map[string]int //store each users' status
  17. var status_learning []string //users who are in learning phase
  18. //all users and theri ip address
  19. var NameList []string
  20. var IpList map[string]string
  21. //test2 uses this hashmap to get from Sample Data, in each round, there are how many messages
  22. var wantToSendHash map[string]int
  23. var packetLength int
  24. //get connection with redis
  25. func GetConn() redis.Conn {
  26. conn, err := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379))
  27. if err != nil {
  28. fmt.Println("redis err", err)
  29. return nil
  30. }
  31. return conn
  32. }
  33. //change the interface type into string type
  34. func interfaceToString(x interface{}) string {
  35. xx, err1 := redis.Strings(x, nil)
  36. if err1 != nil {
  37. fmt.Println("interfaceToString err", err1)
  38. fmt.Println("xx=", xx, "xx[0]=", xx[0])
  39. }
  40. xxx := xx[0]
  41. return xxx
  42. }
  43. var times int //used to set ip address
  44. //set ip address
  45. func setIp() string {
  46. str := strconv.Itoa(times)
  47. times = times + 1
  48. return str
  49. }
  50. var rejoin_number int
  51. //set ip address for users
  52. func getIp(uid string) string {
  53. if IpList[uid] != "" {
  54. if Status[uid] != 3 {
  55. return IpList[uid]
  56. } else { //rejoin users
  57. rejoin_number++
  58. Status[uid] = 0
  59. x := setIp()
  60. IpList[uid] = x
  61. db1_key_channel <- uid
  62. db1_val_channel <- x
  63. return x
  64. }
  65. } else { //new users
  66. NameList = append(NameList, uid)
  67. Status[uid] = 0
  68. x := setIp()
  69. IpList[uid] = x
  70. db1_key_channel <- uid
  71. db1_val_channel <- x
  72. return x
  73. }
  74. }
  75. //db2, client queue;cache time and messages in clients
  76. func pushIntoDB2(uid, time, msg string) {
  77. k1 := uid + "_msg"
  78. k2 := uid + "_time"
  79. db2_msg_key_channel <- k1
  80. db2_time_key_channel <- k2
  81. db2_msg_val_channel <- msg
  82. db2_time_val_channel <- time
  83. }
  84. //4 Variables used in itest2
  85. var User_id_queue string
  86. var Msg_queue string
  87. var Timestamp_queue string
  88. var User_Ip string
  89. //used in db1,key is uid; val is ip
  90. var db1_key_channel chan string
  91. var db1_val_channel chan string
  92. var mutex_db1 sync.Mutex
  93. func db1_Set_Goroutine() {
  94. defer from_db0_input_wg.Done()
  95. conn := GetConn()
  96. conn.Do("select", 1)
  97. for {
  98. mutex_db1.Lock()
  99. x, ok1 := <-db1_key_channel
  100. y, ok2 := <-db1_val_channel
  101. mutex_db1.Unlock()
  102. if !ok1 || !ok2 {
  103. return
  104. }
  105. conn.Do("set", x, y)
  106. }
  107. }
  108. //db2's channels, used to store timestamp in db2
  109. var db2_time_key_channel chan string
  110. var db2_time_val_channel chan string
  111. var mutex_db2_time sync.Mutex
  112. func db2_Time_Set_Goroutine() {
  113. defer from_db0_input_wg.Done()
  114. conn := GetConn()
  115. conn.Do("select", 2)
  116. for {
  117. mutex_db2_time.Lock()
  118. x, ok1 := <-db2_time_key_channel
  119. y, ok2 := <-db2_time_val_channel
  120. mutex_db2_time.Unlock()
  121. if !ok1 || !ok2 {
  122. return
  123. }
  124. conn.Do("lpush", x, y)
  125. }
  126. }
  127. //db2's channel. used to store messages in db2
  128. var db2_msg_key_channel chan string
  129. var db2_msg_val_channel chan string
  130. var mutex_db2_msg sync.Mutex
  131. func db2_Msg_Set_Goroutine() {
  132. defer from_db0_input_wg.Done()
  133. conn := GetConn()
  134. conn.Do("select", 2)
  135. for {
  136. mutex_db2_msg.Lock()
  137. x, ok1 := <-db2_msg_key_channel
  138. y, ok2 := <-db2_msg_val_channel
  139. mutex_db2_msg.Unlock()
  140. if !ok1 || !ok2 {
  141. return
  142. }
  143. conn.Do("lpush", x, y)
  144. }
  145. }
  146. //test2 is used to get messages from sample data in each round
  147. //get all messages in wach hour
  148. func test2(finalTime string, lastTimesIndex int) (finalIndex int) {
  149. fmt.Println("text2() starts")
  150. //6 goroutines init
  151. db1_key_channel = make(chan string, channelSize)
  152. db1_val_channel = make(chan string, channelSize)
  153. db2_time_key_channel = make(chan string, channelSize)
  154. db2_time_val_channel = make(chan string, channelSize)
  155. db2_msg_key_channel = make(chan string, channelSize)
  156. db2_msg_val_channel = make(chan string, channelSize)
  157. wantToSendHash = make(map[string]int)
  158. conn := GetConn()
  159. for i := lastTimesIndex; ; i++ {
  160. //Used to determine if a timeout has occurred
  161. time1, err1 := conn.Do("hmget", i, "Timestamp")
  162. if err1 != nil {
  163. fmt.Println("Timestamp hmget err,err1=", err1)
  164. }
  165. Timestamp_queue = interfaceToString(time1)
  166. //timeout
  167. if Timestamp_queue > finalTime {
  168. finalIndex = i
  169. fmt.Println("i", i)
  170. break
  171. } else {
  172. //no timeout
  173. u_id, err := conn.Do("hmget", i, "User_id")
  174. if err != nil {
  175. fmt.Println("u_id hmget err,err1=", err)
  176. }
  177. User_id_queue = interfaceToString(u_id)
  178. wantToSendHash[User_id_queue] = 1
  179. messages, err2 := conn.Do("hmget", i, "Hashtags")
  180. if err2 != nil {
  181. fmt.Println("message hmget err,err2=", err2)
  182. }
  183. Msg_queue = interfaceToString(messages)
  184. User_Ip = getIp(User_id_queue)
  185. pushIntoDB2(User_id_queue, Timestamp_queue, Msg_queue)
  186. }
  187. }
  188. fmt.Println("for loop ends")
  189. //until now, all messages in this round have been taken from sample data;
  190. for i := 0; i < 130; i++ {
  191. from_db0_input_wg.Add(3)
  192. go db1_Set_Goroutine()
  193. go db2_Time_Set_Goroutine()
  194. go db2_Msg_Set_Goroutine()
  195. }
  196. close(db1_key_channel)
  197. close(db1_val_channel)
  198. close(db2_time_key_channel)
  199. close(db2_time_val_channel)
  200. close(db2_msg_key_channel)
  201. close(db2_msg_val_channel)
  202. from_db0_input_wg.Wait()
  203. fmt.Println("text2() ends")
  204. return
  205. }
  206. //used to porcess these messages from taxt2()
  207. var real_msg_channel chan string
  208. var cover_msg_channel chan string
  209. //used to merged messages
  210. type megred struct {
  211. Megred_Timestamp []string
  212. Megred_Hashtags []string
  213. Megred_User_Id string
  214. Megred_Ip string
  215. Cover_Msg string //if it is a cover message
  216. }
  217. type users struct {
  218. Uid string
  219. Cover int
  220. Delay int
  221. Queue []string
  222. }
  223. type group struct {
  224. Group_id int
  225. Users_list []*users //
  226. Pattern []int //
  227. }
  228. var from_db0_input_wg sync.WaitGroup
  229. //used in sendOut_real; put all uid in wantToSendHash into real_msg_channel
  230. func putWantToSendHashIntoChannel(real_msg_channel chan<- string) {
  231. fmt.Println("len(wantToSendHash)", len(wantToSendHash))
  232. for i := range wantToSendHash {
  233. real_msg_channel <- i
  234. }
  235. fmt.Println("already pu all uid in wantToSendHash into channel")
  236. }
  237. //used in sendOut_cov; all uid in status_learning should send message(cover/real); if they don't have real. then they must send cover message
  238. func putStatus_learningIntoChannel(cover_msg_channel chan<- string) {
  239. fmt.Println("len(status_learning)", len(status_learning))
  240. for _, v := range status_learning {
  241. cover_msg_channel <- v
  242. }
  243. fmt.Println("already all uid in status_learning into channel")
  244. }
  245. //connect to agent
  246. func getHttpConnection() net.Conn {
  247. httpConn, err := net.Dial("tcp", "192.168.32.144:20000")
  248. if err != nil {
  249. fmt.Println("http conn err,dial 192.168.32.144:20000 failed", err)
  250. return nil
  251. }
  252. return httpConn
  253. }
  254. //taken all message in db2, merge them into structure
  255. func merge(v string, conn redis.Conn) *megred {
  256. conn.Do("select", "2")
  257. a := v + "_time"
  258. b := v + "_msg"
  259. l1, err_L1 := conn.Do("LLEN", a)
  260. if err_L1 != nil {
  261. fmt.Println("err_L1=", err_L1)
  262. }
  263. length1, err_length1 := redis.Int(l1, nil)
  264. if err_length1 != nil {
  265. fmt.Println("err_length1=", err_length1)
  266. }
  267. l2, err_L2 := conn.Do("LLEN", b)
  268. if err_L2 != nil {
  269. fmt.Println("err_L2=", err_L2)
  270. }
  271. length2, err_length2 := redis.Int(l2, nil)
  272. if err_length2 != nil {
  273. fmt.Println("err_length2=", err_length2)
  274. }
  275. time_middle, _ := conn.Do("lrange", a, 0, length1-1)
  276. x, err_x := redis.Strings(time_middle, nil)
  277. if err_x != nil {
  278. fmt.Println("err_x=", err_x)
  279. }
  280. aa := x
  281. msg_middle, _ := conn.Do("lrange", b, 0, length2-1)
  282. y, errr_y := redis.Strings(msg_middle, nil)
  283. if errr_y != nil {
  284. fmt.Println("errr_y=", errr_y)
  285. }
  286. bb := y
  287. //clean two queue
  288. for i := 0; i < length1; i++ {
  289. conn.Do("rpop", a)
  290. }
  291. for i := 0; i < length2; i++ {
  292. conn.Do("rpop", b)
  293. }
  294. //do merging
  295. ip_middle := IpList[v]
  296. merged_result := &megred{
  297. aa,
  298. bb,
  299. v,
  300. ip_middle,
  301. "no", //only merge real messages; so not a cover
  302. }
  303. return merged_result
  304. }
  305. var cov_lock sync.Mutex
  306. var covNumber int
  307. var real_lock sync.Mutex
  308. var realNumber int
  309. var arrival_msg_number int
  310. var learning_msg_number int
  311. var online_msg_number int
  312. var learning_msg_number_wg sync.Mutex
  313. var online_msg_number_wg sync.Mutex
  314. var test_list []string
  315. //send real messages out
  316. func sendOut_real(list <-chan string, c net.Conn, cover *megred) {
  317. defer finalSend_wg_real.Done()
  318. time.Sleep(time.Millisecond * 100)
  319. conn := GetConn()
  320. conn.Do("select", "2")
  321. for {
  322. v, ok := <-list
  323. if !ok {
  324. return
  325. }
  326. status_v := Status[v]
  327. switch status_v {
  328. case 0:
  329. arrival_msg_number++
  330. case 1:
  331. learning_msg_number_wg.Lock()
  332. learning_msg_number++
  333. learning_msg_number_wg.Unlock()
  334. case 2:
  335. online_msg_number_wg.Lock()
  336. online_msg_number++
  337. test_list = append(test_list, v)
  338. online_msg_number_wg.Unlock()
  339. }
  340. rst := merge(v, conn) //get merged message
  341. jsons, err_json := json.Marshal(*rst)
  342. if err_json != nil {
  343. fmt.Println("err_json=", err_json)
  344. }
  345. //do padding
  346. total_length := packetLength
  347. padding_length := total_length - len(jsons)
  348. if jsons[len(jsons)-1] != 125 {
  349. fmt.Println("rst=", rst)
  350. break
  351. }
  352. //it is too long, we use a cover message for this too loooooong real message
  353. if padding_length <= 0 {
  354. cov_mid := *cover
  355. cov_mid.Megred_User_Id = v
  356. cov_mid.Cover_Msg = "no"
  357. cov_mid.Megred_Hashtags = []string{"[other]"}
  358. cov_mid.Megred_Ip = IpList[v]
  359. cov_mid.Megred_Timestamp = []string{"000000000"}
  360. jsons, err_json := json.Marshal(cov_mid)
  361. if err_json != nil {
  362. fmt.Println("err_json=", err_json)
  363. }
  364. total_length := packetLength
  365. padding_length := total_length - len(jsons)
  366. pd := make([]byte, padding_length)
  367. jsons = append(jsons, pd...)
  368. if len(jsons) != packetLength {
  369. fmt.Println("err,cover=", jsons)
  370. }
  371. c.Write(jsons)
  372. real_lock.Lock()
  373. realNumber++
  374. real_lock.Unlock()
  375. continue
  376. }
  377. pd := make([]byte, padding_length)
  378. jsons = append(jsons, pd...)
  379. if len(jsons) != packetLength {
  380. fmt.Println("err,rst=", rst)
  381. }
  382. c.Write(jsons)
  383. real_lock.Lock()
  384. realNumber++
  385. real_lock.Unlock()
  386. }
  387. }
  388. //used to send out cover messages
  389. func sendOut_cover(list <-chan string, c net.Conn, cover *megred) {
  390. defer finalSend_wg_cov.Done()
  391. time.Sleep(time.Millisecond * 100)
  392. for {
  393. v, ok := <-list
  394. if !ok {
  395. break
  396. } else {
  397. if wantToSendHash[v] == 1 {
  398. continue //user in learning phase and has a real message in this round.then he don's need to send cover message
  399. } else {
  400. //should send cover message
  401. status_v := Status[v]
  402. switch status_v {
  403. case 1:
  404. learning_msg_number_wg.Lock()
  405. learning_msg_number++
  406. learning_msg_number_wg.Unlock()
  407. }
  408. cov_mid := *cover
  409. cov_mid.Megred_User_Id = v
  410. cov_mid.Cover_Msg = "yes"
  411. cov_mid.Megred_Hashtags = []string{"[cover]"}
  412. cov_mid.Megred_Ip = IpList[v]
  413. cov_mid.Megred_Timestamp = []string{"000000000"}
  414. jsons, err_json := json.Marshal(cov_mid)
  415. if err_json != nil {
  416. fmt.Println("err_json=", err_json)
  417. }
  418. total_length := packetLength
  419. padding_length := total_length - len(jsons)
  420. pd := make([]byte, padding_length)
  421. jsons = append(jsons, pd...)
  422. if len(jsons) != packetLength {
  423. fmt.Println("err,cover=", jsons)
  424. }
  425. c.Write(jsons)
  426. cov_lock.Lock()
  427. covNumber++
  428. cov_lock.Unlock()
  429. }
  430. }
  431. }
  432. }
  433. //tell agent, this round ends, it also needs to be padded
  434. func roundEnd(c net.Conn) {
  435. fmt.Println("send roundEnd")
  436. x := []byte("roundEnd")
  437. total_length := packetLength
  438. padding_length := total_length - len(x)
  439. pd := make([]byte, padding_length)
  440. x = append(x, pd...)
  441. if len(x) != packetLength {
  442. fmt.Println("err,x=", x)
  443. }
  444. c.Write(x)
  445. }
  446. var finalSend_wg_cov sync.WaitGroup
  447. var finalSend_wg_real sync.WaitGroup
  448. //call all send actions
  449. func finalSend() {
  450. c := getHttpConnection()
  451. cover := &megred{
  452. []string{},
  453. []string{},
  454. "",
  455. "cover",
  456. "yes",
  457. }
  458. real_msg_channel = make(chan string)
  459. cover_msg_channel = make(chan string)
  460. for i := 0; i < 1000; i++ {
  461. finalSend_wg_real.Add(1)
  462. finalSend_wg_cov.Add(1)
  463. go sendOut_real(real_msg_channel, c, cover)
  464. go sendOut_cover(cover_msg_channel, c, cover)
  465. }
  466. putWantToSendHashIntoChannel(real_msg_channel)
  467. putStatus_learningIntoChannel(cover_msg_channel)
  468. close(real_msg_channel)
  469. close(cover_msg_channel)
  470. finalSend_wg_real.Wait()
  471. finalSend_wg_cov.Wait()
  472. time.Sleep(time.Millisecond * 100)
  473. roundEnd(c)
  474. }
  475. //init db1,2
  476. func initDB1_DB2() {
  477. conn := GetConn()
  478. conn.Do("select", "1")
  479. conn.Do("flushdb")
  480. conn.Do("keys *")
  481. conn.Do("select", "2")
  482. conn.Do("flushdb")
  483. conn.Do("keys *")
  484. }
  485. //some init works
  486. func start() {
  487. initDB1_DB2()
  488. times = 0
  489. IpList = make(map[string]string)
  490. Status = make(map[string]int)
  491. NameList = make([]string, 0)
  492. status_learning = make([]string, 0)
  493. packetLength = 3000
  494. }
  495. //Calculate the time, because text2 takes messages within one hour each time, and needs this time as deadline
  496. func timeAddOneHour(input string) (outpute string) {
  497. loc, err_loc := time.LoadLocation("Local")
  498. if err_loc != nil {
  499. fmt.Println("err_loc=", err_loc)
  500. }
  501. the_time, err_the_time := time.ParseInLocation("20060102150405", input, loc)
  502. if err_the_time != nil {
  503. fmt.Println("err_the_time=", err_the_time)
  504. }
  505. //1 hour; 2 hours; 30 min
  506. h, _ := time.ParseDuration("1h")
  507. h1 := the_time.Add(1 * h)
  508. //h, _ := time.ParseDuration("1m")
  509. //h1 := the_time.Add(30 * h)
  510. //h, _ := time.ParseDuration("1h")
  511. //h1 := the_time.Add(2 * h)
  512. timeString := h1.Format("20060102150405")
  513. outpute = timeString
  514. return
  515. }
  516. var learning_phase_start_round int
  517. var round int
  518. //reaceive the feedback from agent; this is for users from arrival to learning
  519. func StatusChangeOrNot_arrival_to_learning() {
  520. defer log.Println("StatusChangeOrNot_arrival_to_learning() ends")
  521. fmt.Println("starts StatusChangeOrNot_arrival_to_learning()")
  522. listener, err := net.Listen("tcp", "192.168.32.144:20002")
  523. if err != nil {
  524. fmt.Println("StatusChangeOrNot_arrival_to_learning(),as server,listen 192.168.32.144:20002 fails", err)
  525. return
  526. }
  527. conn, err := listener.Accept()
  528. if err != nil {
  529. fmt.Println("accept failed,StatusChangeOrNot_arrival_to_learning conn err", err)
  530. return
  531. }
  532. var tmp [10000]byte
  533. res := ""
  534. log.Println("all users who have changed their status")
  535. for {
  536. n, err := conn.Read(tmp[:])
  537. if err != nil {
  538. fmt.Println("read from conn failed", err)
  539. break
  540. }
  541. //no change, directly return
  542. if string(tmp[:8]) == "noChange" {
  543. log.Println("StatusChangeOrNot_arrival_to_learning ends,no user changes status")
  544. listener.Close()
  545. return
  546. } else {
  547. result := string(tmp[:n])
  548. res = res + result
  549. if string(tmp[:9]) == "changeEnd" {
  550. fmt.Println("received all status")
  551. listener.Close()
  552. break
  553. }
  554. }
  555. }
  556. //space
  557. res = strings.Replace(res, " ", "", -1)
  558. //line
  559. res = strings.Replace(res, "\n", "", -1)
  560. str_arr := strings.Split(res, ",")
  561. learning_phase_start_round = round
  562. for _, str := range str_arr {
  563. Status[str] = 1
  564. status_learning = append(status_learning, str)
  565. }
  566. fmt.Println("StatusChangeOrNot_arrival_to_learning ends。someone has changed his status, len(status_learning)=", len(status_learning))
  567. }
  568. //feedback, users who have been eliminated in online phase
  569. func StatusChangeOrNot_eliminated() {
  570. defer fmt.Println("StatusChangeOrNot_eliminated ends")
  571. log.Println("start StatusChangeOrNot_eliminated()")
  572. listener, err := net.Listen("tcp", "192.168.32.144:20002")
  573. if err != nil {
  574. fmt.Println("StatusChangeOrNot_eliminated(),as server,listen 192.168.32.144:20002 failed", err)
  575. return
  576. }
  577. conn, err := listener.Accept()
  578. if err != nil {
  579. fmt.Println("accept failed,StatusChangeOrNot_eliminated conn err", err)
  580. return
  581. }
  582. var tmp [10000]byte
  583. res := ""
  584. log.Println("eliminated uid")
  585. for {
  586. n, err := conn.Read(tmp[:])
  587. if err != nil {
  588. fmt.Println("read from conn failed", err)
  589. break
  590. }
  591. //no change
  592. if string(tmp[:8]) == "noChange" {
  593. fmt.Println("StatusChangeOrNot_eliminatedends,no change")
  594. listener.Close()
  595. return
  596. } else {
  597. result := string(tmp[:n])
  598. res = res + result
  599. if string(tmp[:9]) == "changeEnd" {
  600. fmt.Println("received all status")
  601. listener.Close()
  602. break
  603. }
  604. }
  605. }
  606. //space
  607. res = strings.Replace(res, " ", "", -1)
  608. //line
  609. res = strings.Replace(res, "\n", "", -1)
  610. str_arr := strings.Split(res, ",")
  611. //set status =3, which means eliminated
  612. for _, str := range str_arr {
  613. Status[str] = 3
  614. }
  615. log.Println("StatusChangeOrNot_eliminatedends,len(str_arr)=", len(str_arr))
  616. log.Println("the eliminated users are", str_arr)
  617. }
  618. //Learning phase's length,24/48/72 rounds
  619. func refresh_status_learning() {
  620. if round-learning_phase_start_round == 24 { //24,48,72
  621. fmt.Println("start refresh_status_learning")
  622. fmt.Println("status_learning is refreshed,len(status_learning)=", len(status_learning))
  623. status_learning = make([]string, 0)
  624. }
  625. }
  626. //users from learning to online phase
  627. func StatusChangeOrNot_learning_to_online() {
  628. defer log.Println("StatusChangeOrNot_learning_to_online() ends")
  629. log.Println("进入StatusChangeOrNot_learning_to_online() starts")
  630. listener, err := net.Listen("tcp", "192.168.32.144:20002")
  631. if err != nil {
  632. fmt.Println("StatusChangeOrNot_learning_to_online(),as server,listen 192.168.32.144:20002 failed", err)
  633. return
  634. }
  635. conn, err := listener.Accept()
  636. if err != nil {
  637. fmt.Println("accept failed", err)
  638. return
  639. }
  640. var tmp [10000]byte
  641. res := ""
  642. for {
  643. n, err := conn.Read(tmp[:])
  644. if err != nil {
  645. fmt.Println("read from conn failed", err)
  646. break
  647. }
  648. //no change
  649. if string(tmp[:8]) == "noChange" {
  650. log.Println("no change")
  651. listener.Close()
  652. return
  653. } else {
  654. result := string(tmp[:n])
  655. res = res + result
  656. if string(tmp[:9]) == "changeEnd" {
  657. fmt.Println("received all status")
  658. listener.Close()
  659. break
  660. }
  661. }
  662. }
  663. //space
  664. res = strings.Replace(res, " ", "", -1)
  665. //line
  666. res = strings.Replace(res, "\n", "", -1)
  667. str_arr := strings.Split(res, ",")
  668. //set status to 2,namely online
  669. number := 0
  670. for i := 0; i < len(str_arr)-1; i++ {
  671. Status[str_arr[i]] = 2
  672. number++
  673. }
  674. fmt.Println("from learning to online users' number=", number)
  675. fmt.Println("someone has changed into online phase")
  676. }
  677. func main() {
  678. //log
  679. logFile, err := os.OpenFile("log.txt", os.O_CREATE|os.O_APPEND|os.O_RDWR, 0666)
  680. if err != nil {
  681. panic(err)
  682. }
  683. defer logFile.Close()
  684. mw := io.MultiWriter(os.Stdout, logFile)
  685. log.SetOutput(mw)
  686. start()
  687. channelSize = 150000
  688. //1st test point,1101
  689. lastTimesIndex := 0
  690. roundEndTime := "20121101130000"
  691. //2ed test point,1110
  692. //lastTimesIndex := 1479975
  693. //roundEndTime := "20121110130000"
  694. //3rd test point,1120
  695. //lastTimesIndex := 3751916
  696. //roundEndTime := "20121120130000"
  697. for i := 0; i < 35; i++ {
  698. fmt.Println("------------------------------------------------------------the", i, "th round starts------------------------------------------------------------------")
  699. arrival_msg_number = 0
  700. learning_msg_number = 0
  701. online_msg_number = 0
  702. rejoin_number = 0
  703. round = i
  704. covNumber = 0
  705. realNumber = 0
  706. lastTimesIndex = test2(roundEndTime, lastTimesIndex)
  707. fmt.Println("lastTimesIndex=", lastTimesIndex)
  708. roundEndTime = timeAddOneHour(roundEndTime)
  709. fmt.Println("roundEndTime=", roundEndTime)
  710. fmt.Println("status len", len(Status), "NameListlen ", len(NameList), "status_learning len", len(status_learning))
  711. finalSend()
  712. refresh_status_learning()
  713. fmt.Println("Status len", len(Status))
  714. StatusChangeOrNot_arrival_to_learning()
  715. StatusChangeOrNot_learning_to_online()
  716. StatusChangeOrNot_eliminated()
  717. fmt.Println("covNumber=", covNumber)
  718. fmt.Println("realNumber=", realNumber)
  719. fmt.Println("arrival_msg_number=", arrival_msg_number, "learning_msg_number=", learning_msg_number, "online_msg_number=", online_msg_number)
  720. log.Println("rejoin_number", rejoin_number)
  721. fmt.Println("------------------------------------------------------------the", i, "th round ends------------------------------------------------------------------")
  722. }
  723. }