main.go 20 KB


  1. /*
  2. 遇到mod的问题,先ctrl+s保存一下,然后直接go run main.go 如果不行的话,再使用go mod init,之后再go run main.go
  3. */
  4. package main
  5. import (
  6. "encoding/json"
  7. "fmt"
  8. "io"
  9. "log"
  10. "net"
  11. "os"
  12. "strconv"
  13. "strings"
  14. "sync"
  15. "time"
  16. "github.com/gomodule/redigo/redis"
  17. )
  18. var channelSize int
  19. var Status map[string]int //store each users' status
  20. var status_learning []string //users who are in learning phase
  21. //all users and theri ip address
  22. var NameList []string
  23. var IpList map[string]string
  24. //test2 uses this hashmap to get from Sample Data, in each round, there are how many messages
  25. var wantToSendHash map[string]int
  26. var packetLength int
  27. //get connection with redis
  28. func GetConn() redis.Conn {
  29. conn, err := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379))
  30. if err != nil {
  31. fmt.Println("redis err", err)
  32. return nil
  33. }
  34. return conn
  35. }
  36. //change the interface type into string type
  37. func interfaceToString(x interface{}) string {
  38. xx, err1 := redis.Strings(x, nil)
  39. if err1 != nil {
  40. fmt.Println("interfaceToString err", err1)
  41. fmt.Println("xx=", xx, "xx[0]=", xx[0])
  42. }
  43. xxx := xx[0]
  44. return xxx
  45. }
  46. var times int //used to set ip address
  47. //set ip address
  48. func setIp() string {
  49. str := strconv.Itoa(times)
  50. times = times + 1
  51. return str
  52. }
  53. var rejoin_number int
  54. //set ip address for users
  55. func getIp(uid string) string {
  56. if IpList[uid] != "" {
  57. if Status[uid] != 3 {
  58. return IpList[uid]
  59. } else { //rejoin users
  60. rejoin_number++
  61. Status[uid] = 0
  62. x := setIp()
  63. IpList[uid] = x
  64. db1_key_channel <- uid
  65. db1_val_channel <- x
  66. return x
  67. }
  68. } else { //new users
  69. NameList = append(NameList, uid)
  70. Status[uid] = 0
  71. x := setIp()
  72. IpList[uid] = x
  73. db1_key_channel <- uid
  74. db1_val_channel <- x
  75. return x
  76. }
  77. }
  78. //db2, client queue;cache time and messages in clients
  79. func pushIntoDB2(uid, time, msg string) {
  80. k1 := uid + "_msg"
  81. k2 := uid + "_time"
  82. db2_msg_key_channel <- k1
  83. db2_time_key_channel <- k2
  84. db2_msg_val_channel <- msg
  85. db2_time_val_channel <- time
  86. }
  87. //4 Variables used in itest2
  88. var User_id_queue string
  89. var Msg_queue string
  90. var Timestamp_queue string
  91. var User_Ip string
  92. //used in db1,key is uid; val is ip
  93. var db1_key_channel chan string
  94. var db1_val_channel chan string
  95. var mutex_db1 sync.Mutex
  96. func db1_Set_Goroutine() {
  97. defer from_db0_input_wg.Done()
  98. conn := GetConn()
  99. conn.Do("select", 1)
  100. for {
  101. mutex_db1.Lock()
  102. x, ok1 := <-db1_key_channel
  103. y, ok2 := <-db1_val_channel
  104. mutex_db1.Unlock()
  105. if !ok1 || !ok2 {
  106. return
  107. }
  108. conn.Do("set", x, y)
  109. }
  110. }
  111. //db2's channels, used to store timestamp in db2
  112. var db2_time_key_channel chan string
  113. var db2_time_val_channel chan string
  114. var mutex_db2_time sync.Mutex
  115. func db2_Time_Set_Goroutine() {
  116. defer from_db0_input_wg.Done()
  117. conn := GetConn()
  118. conn.Do("select", 2)
  119. for {
  120. mutex_db2_time.Lock()
  121. x, ok1 := <-db2_time_key_channel
  122. y, ok2 := <-db2_time_val_channel
  123. mutex_db2_time.Unlock()
  124. if !ok1 || !ok2 {
  125. return
  126. }
  127. conn.Do("lpush", x, y)
  128. }
  129. }
  130. //db2's channel. used to store messages in db2
  131. var db2_msg_key_channel chan string
  132. var db2_msg_val_channel chan string
  133. var mutex_db2_msg sync.Mutex
  134. func db2_Msg_Set_Goroutine() {
  135. defer from_db0_input_wg.Done()
  136. conn := GetConn()
  137. conn.Do("select", 2)
  138. for {
  139. mutex_db2_msg.Lock()
  140. x, ok1 := <-db2_msg_key_channel
  141. y, ok2 := <-db2_msg_val_channel
  142. mutex_db2_msg.Unlock()
  143. if !ok1 || !ok2 {
  144. return
  145. }
  146. conn.Do("lpush", x, y)
  147. }
  148. }
  149. //test2 is used to get messages from sample data in each round
  150. //get all messages in wach hour
  151. func test2(finalTime string, lastTimesIndex int) (finalIndex int) {
  152. fmt.Println("text2() starts")
  153. //goroutine六个channel的初始化
  154. db1_key_channel = make(chan string, channelSize)
  155. db1_val_channel = make(chan string, channelSize)
  156. db2_time_key_channel = make(chan string, channelSize)
  157. db2_time_val_channel = make(chan string, channelSize)
  158. db2_msg_key_channel = make(chan string, channelSize)
  159. db2_msg_val_channel = make(chan string, channelSize)
  160. wantToSendHash = make(map[string]int)
  161. conn := GetConn()
  162. for i := lastTimesIndex; ; i++ {
  163. //Used to determine if a timeout has occurred
  164. time1, err1 := conn.Do("hmget", i, "Timestamp")
  165. if err1 != nil {
  166. fmt.Println("Timestamp hmget err,err1=", err1)
  167. }
  168. Timestamp_queue = interfaceToString(time1)
  169. //timeout
  170. if Timestamp_queue > finalTime {
  171. finalIndex = i
  172. fmt.Println("i", i)
  173. break
  174. } else {
  175. //no timeout
  176. u_id, err := conn.Do("hmget", i, "User_id")
  177. if err != nil {
  178. fmt.Println("u_id hmget err,err1=", err)
  179. }
  180. User_id_queue = interfaceToString(u_id)
  181. wantToSendHash[User_id_queue] = 1
  182. messages, err2 := conn.Do("hmget", i, "Hashtags")
  183. if err2 != nil {
  184. fmt.Println("message hmget err,err2=", err2)
  185. }
  186. Msg_queue = interfaceToString(messages)
  187. User_Ip = getIp(User_id_queue)
  188. pushIntoDB2(User_id_queue, Timestamp_queue, Msg_queue)
  189. }
  190. }
  191. fmt.Println("for loop ends")
  192. //until now, all messages in this round have been taken from sample data;
  193. for i := 0; i < 130; i++ {
  194. from_db0_input_wg.Add(3)
  195. go db1_Set_Goroutine()
  196. go db2_Time_Set_Goroutine()
  197. go db2_Msg_Set_Goroutine()
  198. }
  199. close(db1_key_channel)
  200. close(db1_val_channel)
  201. close(db2_time_key_channel)
  202. close(db2_time_val_channel)
  203. close(db2_msg_key_channel)
  204. close(db2_msg_val_channel)
  205. from_db0_input_wg.Wait()
  206. fmt.Println("text2() ends")
  207. return
  208. }
  209. //used to porcess these messages from taxt2()
  210. var real_msg_channel chan string
  211. var cover_msg_channel chan string
  212. //used to merged messages
  213. type megred struct {
  214. Megred_Timestamp []string
  215. Megred_Hashtags []string
  216. Megred_User_Id string
  217. Megred_Ip string
  218. Cover_Msg string //if it is a cover message
  219. }
  220. type users struct {
  221. Uid string
  222. Cover int
  223. Delay int
  224. Queue []string
  225. }
  226. type group struct {
  227. Group_id int
  228. Users_list []*users //
  229. Pattern []int //
  230. }
  231. var from_db0_input_wg sync.WaitGroup
  232. //used in sendOut_real; put all uid in wantToSendHash into real_msg_channel
  233. func putWantToSendHashIntoChannel(real_msg_channel chan<- string) {
  234. fmt.Println("len(wantToSendHash)", len(wantToSendHash))
  235. for i := range wantToSendHash {
  236. real_msg_channel <- i
  237. }
  238. fmt.Println("already pu all uid in wantToSendHash into channel")
  239. }
  240. //used in sendOut_cov; all uid in status_learning should send message(cover/real); if they don't have real. then they must send cover message
  241. func putStatus_learningIntoChannel(cover_msg_channel chan<- string) {
  242. fmt.Println("len(status_learning)", len(status_learning))
  243. for _, v := range status_learning {
  244. cover_msg_channel <- v
  245. }
  246. fmt.Println("already all uid in status_learning into channel")
  247. }
  248. //connect to agent
  249. func getHttpConnection() net.Conn {
  250. httpConn, err := net.Dial("tcp", "192.168.32.144:20000")
  251. if err != nil {
  252. fmt.Println("http conn err,dial 192.168.32.144:20000 failed", err)
  253. return nil
  254. }
  255. return httpConn
  256. }
  257. //taken all message in db2, merge them into structure
  258. func merge(v string, conn redis.Conn) *megred {
  259. conn.Do("select", "2")
  260. a := v + "_time"
  261. b := v + "_msg"
  262. l1, err_L1 := conn.Do("LLEN", a)
  263. if err_L1 != nil {
  264. fmt.Println("err_L1=", err_L1)
  265. }
  266. length1, err_length1 := redis.Int(l1, nil)
  267. if err_length1 != nil {
  268. fmt.Println("err_length1=", err_length1)
  269. }
  270. l2, err_L2 := conn.Do("LLEN", b)
  271. if err_L2 != nil {
  272. fmt.Println("err_L2=", err_L2)
  273. }
  274. length2, err_length2 := redis.Int(l2, nil)
  275. if err_length2 != nil {
  276. fmt.Println("err_length2=", err_length2)
  277. }
  278. time_middle, _ := conn.Do("lrange", a, 0, length1-1)
  279. x, err_x := redis.Strings(time_middle, nil)
  280. if err_x != nil {
  281. fmt.Println("err_x=", err_x)
  282. }
  283. aa := x
  284. msg_middle, _ := conn.Do("lrange", b, 0, length2-1)
  285. y, errr_y := redis.Strings(msg_middle, nil)
  286. if errr_y != nil {
  287. fmt.Println("errr_y=", errr_y)
  288. }
  289. bb := y
  290. //clean two queue
  291. for i := 0; i < length1; i++ {
  292. conn.Do("rpop", a)
  293. }
  294. for i := 0; i < length2; i++ {
  295. conn.Do("rpop", b)
  296. }
  297. //do merging
  298. ip_middle := IpList[v]
  299. merged_result := &megred{
  300. aa,
  301. bb,
  302. v,
  303. ip_middle,
  304. "no", //only merge real messages; so not a cover
  305. }
  306. return merged_result
  307. }
  308. var cov_lock sync.Mutex
  309. var covNumber int
  310. var real_lock sync.Mutex
  311. var realNumber int
  312. var arrival_msg_number int
  313. var learning_msg_number int
  314. var online_msg_number int
  315. var learning_msg_number_wg sync.Mutex
  316. var online_msg_number_wg sync.Mutex
  317. var test_list []string
  318. //send real messages out
  319. func sendOut_real(list <-chan string, c net.Conn, cover *megred) {
  320. defer finalSend_wg_real.Done()
  321. time.Sleep(time.Millisecond * 100)
  322. conn := GetConn()
  323. conn.Do("select", "2")
  324. for {
  325. v, ok := <-list
  326. if !ok {
  327. return
  328. }
  329. status_v := Status[v]
  330. switch status_v {
  331. case 0:
  332. arrival_msg_number++
  333. case 1:
  334. learning_msg_number_wg.Lock()
  335. learning_msg_number++
  336. learning_msg_number_wg.Unlock()
  337. case 2:
  338. online_msg_number_wg.Lock()
  339. online_msg_number++
  340. test_list = append(test_list, v)
  341. online_msg_number_wg.Unlock()
  342. }
  343. rst := merge(v, conn) //get merged message
  344. jsons, err_json := json.Marshal(*rst)
  345. if err_json != nil {
  346. fmt.Println("err_json=", err_json)
  347. }
  348. //do padding
  349. total_length := packetLength
  350. padding_length := total_length - len(jsons)
  351. if jsons[len(jsons)-1] != 125 {
  352. fmt.Println("rst=", rst)
  353. break
  354. }
  355. //it is too long, we use a cover message for this too loooooong real message
  356. if padding_length <= 0 {
  357. cov_mid := *cover
  358. cov_mid.Megred_User_Id = v
  359. cov_mid.Cover_Msg = "no"
  360. cov_mid.Megred_Hashtags = []string{"[other]"}
  361. cov_mid.Megred_Ip = IpList[v]
  362. cov_mid.Megred_Timestamp = []string{"000000000"}
  363. jsons, err_json := json.Marshal(cov_mid)
  364. if err_json != nil {
  365. fmt.Println("err_json=", err_json)
  366. }
  367. total_length := packetLength
  368. padding_length := total_length - len(jsons)
  369. pd := make([]byte, padding_length)
  370. jsons = append(jsons, pd...)
  371. if len(jsons) != packetLength {
  372. fmt.Println("err,cover=", jsons)
  373. }
  374. c.Write(jsons)
  375. real_lock.Lock()
  376. realNumber++
  377. real_lock.Unlock()
  378. continue
  379. }
  380. pd := make([]byte, padding_length)
  381. jsons = append(jsons, pd...)
  382. if len(jsons) != packetLength {
  383. fmt.Println("err出错了,长度不对,rst=", rst)
  384. }
  385. c.Write(jsons)
  386. real_lock.Lock()
  387. realNumber++
  388. real_lock.Unlock()
  389. }
  390. }
  391. //used to send out cover messages
  392. func sendOut_cover(list <-chan string, c net.Conn, cover *megred) {
  393. defer finalSend_wg_cov.Done()
  394. time.Sleep(time.Millisecond * 100)
  395. for {
  396. v, ok := <-list
  397. if !ok {
  398. break
  399. } else {
  400. if wantToSendHash[v] == 1 {
  401. continue //user in learning phase and has a real message in this round.then he don's need to send cover message
  402. } else {
  403. //should send cover message
  404. status_v := Status[v]
  405. switch status_v {
  406. case 1:
  407. learning_msg_number_wg.Lock()
  408. learning_msg_number++
  409. learning_msg_number_wg.Unlock()
  410. }
  411. cov_mid := *cover
  412. cov_mid.Megred_User_Id = v
  413. cov_mid.Cover_Msg = "yes"
  414. cov_mid.Megred_Hashtags = []string{"[cover]"}
  415. cov_mid.Megred_Ip = IpList[v]
  416. cov_mid.Megred_Timestamp = []string{"000000000"}
  417. jsons, err_json := json.Marshal(cov_mid)
  418. if err_json != nil {
  419. fmt.Println("err_json=", err_json)
  420. }
  421. total_length := packetLength
  422. padding_length := total_length - len(jsons)
  423. pd := make([]byte, padding_length)
  424. jsons = append(jsons, pd...)
  425. if len(jsons) != packetLength {
  426. fmt.Println("err,cover=", jsons)
  427. }
  428. c.Write(jsons)
  429. cov_lock.Lock()
  430. covNumber++
  431. cov_lock.Unlock()
  432. }
  433. }
  434. }
  435. }
  436. //tell agent, this round ends, it also needs to be padded
  437. func roundEnd(c net.Conn) {
  438. fmt.Println("send roundEnd")
  439. x := []byte("roundEnd")
  440. total_length := packetLength
  441. padding_length := total_length - len(x)
  442. pd := make([]byte, padding_length)
  443. x = append(x, pd...)
  444. if len(x) != packetLength {
  445. fmt.Println("err,x=", x)
  446. }
  447. c.Write(x)
  448. }
  449. var finalSend_wg_cov sync.WaitGroup
  450. var finalSend_wg_real sync.WaitGroup
  451. //call all send actions
  452. func finalSend() {
  453. c := getHttpConnection()
  454. cover := &megred{
  455. []string{},
  456. []string{},
  457. "",
  458. "cover",
  459. "yes",
  460. }
  461. real_msg_channel = make(chan string)
  462. cover_msg_channel = make(chan string)
  463. for i := 0; i < 1000; i++ {
  464. finalSend_wg_real.Add(1)
  465. finalSend_wg_cov.Add(1)
  466. go sendOut_real(real_msg_channel, c, cover)
  467. go sendOut_cover(cover_msg_channel, c, cover)
  468. }
  469. putWantToSendHashIntoChannel(real_msg_channel)
  470. putStatus_learningIntoChannel(cover_msg_channel)
  471. close(real_msg_channel)
  472. close(cover_msg_channel)
  473. finalSend_wg_real.Wait()
  474. finalSend_wg_cov.Wait()
  475. time.Sleep(time.Millisecond * 100)
  476. roundEnd(c)
  477. }
  478. //init db1,2
  479. func initDB1_DB2() {
  480. conn := GetConn()
  481. conn.Do("select", "1")
  482. conn.Do("flushdb")
  483. conn.Do("keys *")
  484. conn.Do("select", "2")
  485. conn.Do("flushdb")
  486. conn.Do("keys *")
  487. }
  488. //some init works
  489. func start() {
  490. initDB1_DB2()
  491. times = 0
  492. IpList = make(map[string]string)
  493. Status = make(map[string]int)
  494. NameList = make([]string, 0)
  495. status_learning = make([]string, 0)
  496. packetLength = 3000
  497. }
  498. //Calculate the time, because text2 takes messages within one hour each time, and needs this time as deadline
  499. func timeAddOneHour(input string) (outpute string) {
  500. loc, err_loc := time.LoadLocation("Local")
  501. if err_loc != nil {
  502. fmt.Println("err_loc=", err_loc)
  503. }
  504. the_time, err_the_time := time.ParseInLocation("20060102150405", input, loc)
  505. if err_the_time != nil {
  506. fmt.Println("err_the_time=", err_the_time)
  507. }
  508. //1 hour; 2 hours; 30 min
  509. h, _ := time.ParseDuration("1h")
  510. h1 := the_time.Add(1 * h)
  511. //h, _ := time.ParseDuration("1m")
  512. //h1 := the_time.Add(30 * h)
  513. //h, _ := time.ParseDuration("1h")
  514. //h1 := the_time.Add(2 * h)
  515. timeString := h1.Format("20060102150405")
  516. outpute = timeString
  517. return
  518. }
  519. var learning_phase_start_round int
  520. var round int
  521. //reaceive the feedback from agent; this is for users from arrival to learning
  522. func StatusChangeOrNot_arrival_to_learning() {
  523. defer log.Println("StatusChangeOrNot_arrival_to_learning() ends")
  524. fmt.Println("starts StatusChangeOrNot_arrival_to_learning()")
  525. listener, err := net.Listen("tcp", "192.168.32.144:20002")
  526. if err != nil {
  527. fmt.Println("StatusChangeOrNot_arrival_to_learning(),as server,listen 192.168.32.144:20002 fails", err)
  528. return
  529. }
  530. conn, err := listener.Accept()
  531. if err != nil {
  532. fmt.Println("accept failed,StatusChangeOrNot_arrival_to_learning conn err", err)
  533. return
  534. }
  535. var tmp [10000]byte
  536. res := ""
  537. log.Println("all users who have changed their status")
  538. for {
  539. n, err := conn.Read(tmp[:])
  540. if err != nil {
  541. fmt.Println("read from conn failed", err)
  542. break
  543. }
  544. //no change, directly return
  545. if string(tmp[:8]) == "noChange" {
  546. log.Println("StatusChangeOrNot_arrival_to_learning ends,no user changes status")
  547. listener.Close()
  548. return
  549. } else {
  550. result := string(tmp[:n])
  551. res = res + result
  552. if string(tmp[:9]) == "changeEnd" {
  553. fmt.Println("received all status")
  554. listener.Close()
  555. break
  556. }
  557. }
  558. }
  559. //space
  560. res = strings.Replace(res, " ", "", -1)
  561. //line
  562. res = strings.Replace(res, "\n", "", -1)
  563. str_arr := strings.Split(res, ",")
  564. learning_phase_start_round = round
  565. for _, str := range str_arr {
  566. Status[str] = 1
  567. status_learning = append(status_learning, str)
  568. }
  569. fmt.Println("StatusChangeOrNot_arrival_to_learning ends。someone has changed his status, len(status_learning)=", len(status_learning))
  570. }
  571. //feedback, users who have been eliminated in online phase
  572. func StatusChangeOrNot_eliminated() {
  573. defer fmt.Println("StatusChangeOrNot_eliminated ends")
  574. log.Println("start StatusChangeOrNot_eliminated()")
  575. listener, err := net.Listen("tcp", "192.168.32.144:20002")
  576. if err != nil {
  577. fmt.Println("StatusChangeOrNot_eliminated(),as server,listen 192.168.32.144:20002 failed", err)
  578. return
  579. }
  580. conn, err := listener.Accept()
  581. if err != nil {
  582. fmt.Println("accept failed,StatusChangeOrNot_eliminated conn err", err)
  583. return
  584. }
  585. var tmp [10000]byte
  586. res := ""
  587. log.Println("eliminated uid")
  588. for {
  589. n, err := conn.Read(tmp[:])
  590. if err != nil {
  591. fmt.Println("read from conn failed", err)
  592. break
  593. }
  594. //no change
  595. if string(tmp[:8]) == "noChange" {
  596. fmt.Println("StatusChangeOrNot_eliminatedends,no change")
  597. listener.Close()
  598. return
  599. } else {
  600. result := string(tmp[:n])
  601. res = res + result
  602. if string(tmp[:9]) == "changeEnd" {
  603. fmt.Println("received all status")
  604. listener.Close()
  605. break
  606. }
  607. }
  608. }
  609. //space
  610. res = strings.Replace(res, " ", "", -1)
  611. //line
  612. res = strings.Replace(res, "\n", "", -1)
  613. str_arr := strings.Split(res, ",")
  614. //set status =3, which means eliminated
  615. for _, str := range str_arr {
  616. Status[str] = 3
  617. }
  618. log.Println("StatusChangeOrNot_eliminatedends,len(str_arr)=", len(str_arr))
  619. log.Println("the eliminated users are", str_arr)
  620. }
  621. //Learning phase's length,24/48/72 rounds
  622. func refresh_status_learning() {
  623. if round-learning_phase_start_round == 24 { //24,48,72
  624. fmt.Println("start refresh_status_learning")
  625. fmt.Println("status_learning is refreshed,len(status_learning)=", len(status_learning))
  626. status_learning = make([]string, 0)
  627. }
  628. }
  629. //users from learning to online phase
  630. func StatusChangeOrNot_learning_to_online() {
  631. defer log.Println("StatusChangeOrNot_learning_to_online() ends")
  632. log.Println("进入StatusChangeOrNot_learning_to_online() starts")
  633. listener, err := net.Listen("tcp", "192.168.32.144:20002")
  634. if err != nil {
  635. fmt.Println("StatusChangeOrNot_learning_to_online(),as server,listen 192.168.32.144:20002 failed", err)
  636. return
  637. }
  638. conn, err := listener.Accept()
  639. if err != nil {
  640. fmt.Println("accept failed", err)
  641. return
  642. }
  643. var tmp [10000]byte
  644. res := ""
  645. for {
  646. n, err := conn.Read(tmp[:])
  647. if err != nil {
  648. fmt.Println("read from conn failed", err)
  649. break
  650. }
  651. //no change
  652. if string(tmp[:8]) == "noChange" {
  653. log.Println("no change")
  654. listener.Close()
  655. return
  656. } else {
  657. result := string(tmp[:n])
  658. res = res + result
  659. if string(tmp[:9]) == "changeEnd" {
  660. fmt.Println("received all status")
  661. listener.Close()
  662. break
  663. }
  664. }
  665. }
  666. //space
  667. res = strings.Replace(res, " ", "", -1)
  668. //line
  669. res = strings.Replace(res, "\n", "", -1)
  670. str_arr := strings.Split(res, ",")
  671. //set status to 2,namely online
  672. number := 0
  673. for i := 0; i < len(str_arr)-1; i++ {
  674. Status[str_arr[i]] = 2
  675. number++
  676. }
  677. fmt.Println("from learning to online users' number=", number)
  678. fmt.Println("someone has changed into online phase")
  679. }
  680. func main() {
  681. //log
  682. logFile, err := os.OpenFile("log.txt", os.O_CREATE|os.O_APPEND|os.O_RDWR, 0666)
  683. if err != nil {
  684. panic(err)
  685. }
  686. defer logFile.Close()
  687. mw := io.MultiWriter(os.Stdout, logFile)
  688. log.SetOutput(mw)
  689. start()
  690. channelSize = 150000
  691. //1st test point,1101
  692. lastTimesIndex := 0
  693. roundEndTime := "20121101130000"
  694. //2ed test point,1110
  695. //lastTimesIndex := 1479975
  696. //roundEndTime := "20121110130000"
  697. //3rd test point,1120
  698. //lastTimesIndex := 3751916
  699. //roundEndTime := "20121120130000"
  700. for i := 0; i < 35; i++ {
  701. fmt.Println("------------------------------------------------------------the", i, "th round starts------------------------------------------------------------------")
  702. arrival_msg_number = 0
  703. learning_msg_number = 0
  704. online_msg_number = 0
  705. rejoin_number = 0
  706. round = i
  707. covNumber = 0
  708. realNumber = 0
  709. lastTimesIndex = test2(roundEndTime, lastTimesIndex)
  710. fmt.Println("lastTimesIndex=", lastTimesIndex)
  711. roundEndTime = timeAddOneHour(roundEndTime)
  712. fmt.Println("roundEndTime=", roundEndTime)
  713. fmt.Println("status len", len(Status), "NameListlen ", len(NameList), "status_learning len", len(status_learning))
  714. finalSend()
  715. refresh_status_learning()
  716. fmt.Println("Status len", len(Status))
  717. StatusChangeOrNot_arrival_to_learning()
  718. StatusChangeOrNot_learning_to_online()
  719. StatusChangeOrNot_eliminated()
  720. fmt.Println("covNumber=", covNumber)
  721. fmt.Println("realNumber=", realNumber)
  722. fmt.Println("arrival_msg_number=", arrival_msg_number, "learning_msg_number=", learning_msg_number, "online_msg_number=", online_msg_number)
  723. log.Println("rejoin_number", rejoin_number)
  724. fmt.Println("------------------------------------------------------------the", i, "th round ends------------------------------------------------------------------")
  725. }
  726. }