123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098 |
- package main
- import (
- "encoding/json"
- "fmt"
- "io"
- "io/ioutil"
- "log"
- "net"
- "os"
- "strconv"
- "strings"
- "sync"
- "time"
- "github.com/gomodule/redigo/redis"
- )
- var channelSize int
- var Status map[string]int
- var status_learning []string
- var NameList []string
- var IpList map[string]string
- var wantToSendHash map[string]int
- var packetLength int
- func GetConn() redis.Conn {
- conn, err := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379))
- if err != nil {
- fmt.Println("connect redis error", err)
- return nil
- }
- return conn
- }
- func interfaceToString(x interface{}) string {
- xx, err1 := redis.Strings(x, nil)
- if err1 != nil {
- fmt.Println("interfaceToString", err1)
- fmt.Println("xx=", xx, "xx[0]=", xx[0])
- }
- xxx := xx[0]
- return xxx
- }
- var times int
- func setIp() string {
- str := strconv.Itoa(times)
- times = times + 1
- return str
- }
- var rejoin_number int
- func getIp(uid string) string {
- if IpList[uid] != "" {
- if Status[uid] != 3 {
- return IpList[uid]
- } else {
- rejoin_number++
- Status[uid] = 0
- x := setIp()
- IpList[uid] = x
- db1_key_channel <- uid
- db1_val_channel <- x
- return x
- }
- } else {
- NameList = append(NameList, uid)
- Status[uid] = 0
- x := setIp()
- IpList[uid] = x
- db1_key_channel <- uid
- db1_val_channel <- x
- return x
- }
- }
- func pushIntoDB2(uid, time, msg string) {
- k1 := uid + "_msg"
- k2 := uid + "_time"
- db2_msg_key_channel <- k1
- db2_time_key_channel <- k2
- db2_msg_val_channel <- msg
- db2_time_val_channel <- time
- }
- var User_id_queue string
- var Msg_queue string
- var Timestamp_queue string
- var User_Ip string
- var db1_key_channel chan string
- var db1_val_channel chan string
- var mutex_db1 sync.Mutex
- func db1_Set_Goroutine() {
- defer from_db0_input_wg.Done()
- conn := GetConn()
- conn.Do("select", 1)
- for {
- mutex_db1.Lock()
- x, ok1 := <-db1_key_channel
- y, ok2 := <-db1_val_channel
- mutex_db1.Unlock()
- if !ok1 || !ok2 {
- return
- }
- conn.Do("set", x, y)
- }
- }
- var db2_time_key_channel chan string
- var db2_time_val_channel chan string
- var mutex_db2_time sync.Mutex
- func db2_Time_Set_Goroutine() {
- defer from_db0_input_wg.Done()
- conn := GetConn()
- conn.Do("select", 2)
- for {
- mutex_db2_time.Lock()
- x, ok1 := <-db2_time_key_channel
- y, ok2 := <-db2_time_val_channel
- mutex_db2_time.Unlock()
- if !ok1 || !ok2 {
- return
- }
- conn.Do("lpush", x, y)
- }
- }
- var db2_msg_key_channel chan string
- var db2_msg_val_channel chan string
- var mutex_db2_msg sync.Mutex
- func db2_Msg_Set_Goroutine() {
- defer from_db0_input_wg.Done()
- conn := GetConn()
- conn.Do("select", 2)
- for {
- mutex_db2_msg.Lock()
- x, ok1 := <-db2_msg_key_channel
- y, ok2 := <-db2_msg_val_channel
- mutex_db2_msg.Unlock()
- if !ok1 || !ok2 {
- return
- }
- conn.Do("lpush", x, y)
- }
- }
- // Exactly the same as in v1
- func test2(finalTime string, lastTimesIndex int) (finalIndex int) {
- fmt.Println("text2()")
- db1_key_channel = make(chan string, channelSize)
- db1_val_channel = make(chan string, channelSize)
- db2_time_key_channel = make(chan string, channelSize)
- db2_time_val_channel = make(chan string, channelSize)
- db2_msg_key_channel = make(chan string, channelSize)
- db2_msg_val_channel = make(chan string, channelSize)
- wantToSendHash = make(map[string]int)
- conn := GetConn()
- for i := lastTimesIndex; ; i++ {
- time1, err1 := conn.Do("hmget", i, "Timestamp")
- if err1 != nil {
- fmt.Println("Timestamp hmget err1=", err1)
- }
- Timestamp_queue = interfaceToString(time1)
- if Timestamp_queue > finalTime {
- finalIndex = i
- fmt.Println("i", i)
- break
- } else {
- u_id, err := conn.Do("hmget", i, "User_id")
- if err != nil {
- fmt.Println("u_id hmget ,err1=", err)
- }
- User_id_queue = interfaceToString(u_id)
- wantToSendHash[User_id_queue] = 1
- messages, err2 := conn.Do("hmget", i, "Hashtags")
- if err2 != nil {
- fmt.Println("message hmget ,err2=", err2)
- }
- Msg_queue = interfaceToString(messages)
- User_Ip = getIp(User_id_queue)
- pushIntoDB2(User_id_queue, Timestamp_queue, Msg_queue)
- }
- }
- for i := 0; i < 130; i++ {
- from_db0_input_wg.Add(3)
- go db1_Set_Goroutine()
- go db2_Time_Set_Goroutine()
- go db2_Msg_Set_Goroutine()
- }
- close(db1_key_channel)
- close(db1_val_channel)
- close(db2_time_key_channel)
- close(db2_time_val_channel)
- close(db2_msg_key_channel)
- close(db2_msg_val_channel)
- from_db0_input_wg.Wait()
- fmt.Println("text2() ends")
- return
- }
- var real_msg_channel chan string
- var cover_msg_channel chan string
- type megred struct {
- Megred_Timestamp []string
- Megred_Hashtags []string
- Megred_User_Id string
- Megred_Ip string
- Cover_Msg string
- }
- var from_db0_input_wg sync.WaitGroup
- //Exactly the same as in v1
- func putWantToSendHashIntoChannel(real_msg_channel chan<- string) {
- fmt.Println("wantToSendHash", len(wantToSendHash))
- for i := range wantToSendHash {
- real_msg_channel <- i
- }
- }
- //Exactly the same as in v1
- func putStatus_learningIntoChannel(cover_msg_channel chan<- string) {
- fmt.Println("status_learning", len(status_learning))
- for _, v := range status_learning {
- cover_msg_channel <- v
- }
- }
- //Exactly the same as in v1
- func getHttpConnection() net.Conn {
- httpConn, err := net.Dial("tcp", "192.168.32.144:20000")
- if err != nil {
- fmt.Println("conn errdial 192.168.32.144:20000 failed", err)
- return nil
- }
- return httpConn
- }
- ////Exactly the same as in v1
- func merge(v string, conn redis.Conn) *megred {
- conn.Do("select", "2")
- a := v + "_time"
- b := v + "_msg"
- l1, err_L1 := conn.Do("LLEN", a)
- if err_L1 != nil {
- fmt.Println("err_L1=", err_L1)
- }
- length1, err_length1 := redis.Int(l1, nil)
- if err_length1 != nil {
- fmt.Println("err_length1=", err_length1)
- }
- l2, err_L2 := conn.Do("LLEN", b)
- if err_L2 != nil {
- fmt.Println("err_L2=", err_L2)
- }
- length2, err_length2 := redis.Int(l2, nil)
- if err_length2 != nil {
- fmt.Println("err_length2=", err_length2)
- }
- time_middle, _ := conn.Do("lrange", a, 0, length1-1)
- x, err_x := redis.Strings(time_middle, nil)
- if err_x != nil {
- fmt.Println("err_x=", err_x)
- }
- aa := x
- msg_middle, _ := conn.Do("lrange", b, 0, length2-1)
- y, errr_y := redis.Strings(msg_middle, nil)
- if errr_y != nil {
- fmt.Println("errr_y=", errr_y)
- }
- bb := y
- for i := 0; i < length1; i++ {
- conn.Do("rpop", a)
- }
- for i := 0; i < length2; i++ {
- conn.Do("rpop", b)
- }
- ip_middle := IpList[v]
- merged_result := &megred{
- aa,
- bb,
- v,
- ip_middle,
- "no",
- }
- return merged_result
- }
- var cov_lock sync.Mutex
- var covNumber int
- var real_lock sync.Mutex
- var realNumber int
- var arrival_msg_number int
- var learning_msg_number int
- var online_msg_number int
- var learning_msg_number_wg sync.Mutex
- var online_msg_number_wg sync.Mutex
- var test_list []string
- //Unlike in V1, each user has the structure
- type users struct {
- Uid string
- Bonus int //only used for bonus-based V3
- Chance int
- Queue [][]byte
- Group *group
- Used_cover int
- Used_delay int
- }
- type group struct {
- Group_id int
- Users_list []*users
- }
- //about pattern()
- var k_val int
- var batch int
- var count_cover_delay_lock sync.Mutex
- var total_used_cover int
- var total_used_delay int
- var int_to_group map[int]*group
- var grouplist []*group
- var online_users_hash map[string]*users
- var online_users_hash_lock sync.Mutex
- var users_channel chan *users
- var users_channel_wg sync.WaitGroup
- var eliminated_users_list []string
- var allow_hash map[string]bool
- //Communicate with agent. The process used to determine the list of users who need to send messages, i.e. the non-fixed scheduler
- func before_finalsend() {
- c1, c_err := net.Dial("tcp", "192.168.32.144:20010") //agent address
- if c_err != nil {
- log.Println("err,,dial 192.168.32.144:20010 failed", c_err)
- }
- wantToSend_list := make([]string, 0)
- //1,send the active users list to agent
- for k := range online_users_hash {
- if wantToSendHash[k] == 1 {
- wantToSend_list = append(wantToSend_list, k)
- }
- }
- if len(wantToSend_list) > 0 {
- sendToAgent := ""
- for i, v := range wantToSend_list {
- sendToAgent = sendToAgent + v + ","
- if (i%6000 == 0 && i != 0) || i == len(wantToSend_list)-1 {
- sendToCli := []byte(sendToAgent)
- c1.Write(sendToCli)
- sendToAgent = ""
- }
- }
- time.Sleep(time.Second * 2)
- c1.Write([]byte("changeEnd"))
- } else {
- c1.Write([]byte("noChange"))
- }
- //2,get feedback from agent
- listener, err := net.Listen("tcp", "192.168.32.144:20012")
- if err != nil {
- fmt.Println("err 192.168.32.144:20012 failed", err)
- return
- }
- conn, err := listener.Accept()
- if err != nil {
- fmt.Println("conn err before()", err)
- return
- }
- var tmp [10000]byte
- res := ""
- for {
- n, err := conn.Read(tmp[:])
- if err != nil {
- fmt.Println("read from conn failed", err)
- break
- }
- //no feedback
- if string(tmp[:8]) == "noChange" {
- listener.Close()
- break
- } else {
- result := string(tmp[:n])
- res = res + result
- if string(tmp[:9]) == "changeEnd" {
- fmt.Println("receive feedback end")
- listener.Close()
- break
- }
- }
- }
- res = strings.Replace(res, " ", "", -1)
- res = strings.Replace(res, "\n", "", -1)
- str_arr := strings.Split(res, ",")
- for _, str := range str_arr {
- allow_hash[str] = true
- }
- log.Println("before ends", len(str_arr)-1)
- }
- //Communicate with agent. The process used to determine the list of users who need to send messages, i.e. the non-fixed scheduler
- func pattern() {
- log.Println("pattern")
- filePath1 := "/home/it/middle_data/filtered_good_name_list.txt"
- content1, err1 := ioutil.ReadFile(filePath1)
- if err1 != nil {
- fmt.Println("err1,", err1)
- panic(err1)
- }
- m1 := strings.Split(string(content1), "\n")
- filePath2 := "/home/it/middle_data/groupingResult.txt"
- content2, err2 := ioutil.ReadFile(filePath2)
- if err2 != nil {
- fmt.Println("err2", err2)
- panic(err2)
- }
- m2 := strings.Split(string(content2), "\n")
- for i := 0; i < k_val-1; i++ {
- var g group
- g.Group_id = i + batch*k_val
- g.Users_list = make([]*users, 0)
- int_to_group[i] = &g
- grouplist = append(grouplist, &g)
- }
- //creat structure for each user
- for i := 0; i < len(m2)-1; i++ {
- var u users
- u.Uid = m1[i]
- u.Chance = 10
- u.Bonus = 0
- u.Queue = make([][]byte, 0)
- middle, _ := strconv.Atoi(m2[i])
- u.Group = int_to_group[middle]
- u.Used_cover = 0
- u.Used_delay = 0
- int_to_group[middle].Users_list = append(int_to_group[middle].Users_list, &u)
- online_users_hash_lock.Lock()
- online_users_hash[u.Uid] = &u
- online_users_hash_lock.Unlock()
- }
- log.Println("pattern ends", len(m2)-1)
- }
- //Prepare for the sending of all users with scheduler
- func send_online(c net.Conn) {
- defer log.Println("send_online ends")
- users_channel = make(chan *users, 100000)
- log.Println("send_online")
- for _, v := range online_users_hash {
- if v == nil {
- continue
- }
- if v.Group.Group_id%k_val != k_val-2 {
- users_channel <- v
- }
- }
- close(users_channel)
- for i := 0; i < 1000; i++ {
- users_channel_wg.Add(1)
- go send_each_user(c)
- }
- users_channel_wg.Wait()
- }
- //send func for each user with non-fixed scheduer;
- //Determine the sending behavior of each user by the number of feedbacks and chances at the agent
- func send_each_user(c net.Conn) {
- defer users_channel_wg.Done()
- for {
- u, ok := <-users_channel
- if !ok {
- return
- } else {
- if allow_hash[u.Uid] {
- if len(u.Queue) != 0 {
- x := u.Queue[0]
- u.Queue = u.Queue[1:]
- c.Write(x)
- u.Bonus++
- if u.Bonus >= 10 {
- u.Bonus = 0
- if u.Chance < 3 { //bonus-based or chance-based
- u.Chance = u.Chance + 0
- }
- }
- continue
- } else {
- if u.Chance > 0 {
- cover := &megred{
- []string{},
- []string{},
- "",
- "cover",
- "yes",
- }
- cov_mid := *cover
- cov_mid.Megred_User_Id = u.Uid
- cov_mid.Cover_Msg = "yes"
- cov_mid.Megred_Hashtags = []string{"[cover]"}
- cov_mid.Megred_Ip = IpList[u.Uid]
- cov_mid.Megred_Timestamp = []string{"000000000"}
- jsons, err_json := json.Marshal(cov_mid)
- if err_json != nil {
- fmt.Println("err_json=", err_json)
- }
- total_length := packetLength
- padding_length := total_length - len(jsons)
- pd := make([]byte, padding_length)
- jsons = append(jsons, pd...)
- if len(jsons) != packetLength {
- fmt.Println("cover=", jsons)
- }
- c.Write(jsons)
- u.Chance = u.Chance - 1
- u.Used_cover++
- continue
- } else {
- continue
- }
- }
- } else {
- if len(u.Queue) > 0 {
- if u.Chance > 0 {
- u.Chance = u.Chance - 1
- u.Used_delay++
- continue
- } else {
- x := u.Queue[0]
- u.Queue = u.Queue[1:]
- c.Write(x)
- continue
- }
- } else {
- continue
- }
- }
- }
- }
- }
- //Similar to v1, but only the merged message is cached in the golden scheduler queue;
- //whether to send it or not is determined by the above function "send_each_user"
- func sendOut_real(list <-chan string, c net.Conn, cover *megred) {
- defer finalSend_wg_real.Done()
- time.Sleep(time.Millisecond * 100)
- conn := GetConn()
- conn.Do("select", "2")
- for {
- v, ok := <-list
- if !ok {
- return
- }
- status_v := Status[v]
- switch status_v {
- case 0:
- arrival_msg_number++
- case 1:
- learning_msg_number_wg.Lock()
- learning_msg_number++
- learning_msg_number_wg.Unlock()
- case 2:
- online_msg_number_wg.Lock()
- online_msg_number++
- test_list = append(test_list, v)
- online_msg_number_wg.Unlock()
- }
- rst := merge(v, conn)
- jsons, err_json := json.Marshal(*rst)
- if err_json != nil {
- fmt.Println("err_json=", err_json)
- }
- total_length := packetLength
- padding_length := total_length - len(jsons)
- if jsons[len(jsons)-1] != 125 {
- fmt.Println("rst=", rst)
- break
- }
- if padding_length <= 0 {
- fmt.Println("longer than packetLength,v=", v, "jsons length", len(jsons), "use cover instead of it")
- cov_mid := *cover
- cov_mid.Megred_User_Id = v
- cov_mid.Cover_Msg = "no"
- cov_mid.Megred_Hashtags = []string{"[other]"}
- cov_mid.Megred_Ip = IpList[v]
- cov_mid.Megred_Timestamp = []string{"000000000"}
- jsons, err_json := json.Marshal(cov_mid)
- if err_json != nil {
- fmt.Println("err_json=", err_json)
- }
- total_length := packetLength
- padding_length := total_length - len(jsons)
- pd := make([]byte, padding_length)
- jsons = append(jsons, pd...)
- if len(jsons) != packetLength {
- fmt.Println("cover=", jsons)
- }
- if status_v != 2 || online_users_hash[v] == nil {
- c.Write(jsons)
- real_lock.Lock()
- realNumber++
- real_lock.Unlock()
- continue
- }
- //online
- if status_v == 2 {
- if online_users_hash[v].Group.Group_id%k_val != k_val-2 {
- online_users_hash_lock.Lock()
- middle_user := online_users_hash[v]
- online_users_hash_lock.Unlock()
- middle_user.Queue = append(middle_user.Queue, jsons)
- continue
- } else { //outliers
- c.Write(jsons)
- real_lock.Lock()
- realNumber++
- real_lock.Unlock()
- continue
- }
- }
- continue
- }
- pd := make([]byte, padding_length)
- jsons = append(jsons, pd...)
- if len(jsons) != packetLength {
- fmt.Println("err,rst=", rst)
- }
- //low frequence
- if status_v != 2 || online_users_hash[v] == nil {
- c.Write(jsons)
- real_lock.Lock()
- realNumber++
- real_lock.Unlock()
- continue
- }
- //online
- if status_v == 2 { //normal users
- if online_users_hash[v].Group.Group_id%k_val != k_val-2 {
- online_users_hash_lock.Lock()
- middle_user := online_users_hash[v]
- online_users_hash_lock.Unlock()
- middle_user.Queue = append(middle_user.Queue, jsons)
- continue
- } else { //outliers
- c.Write(jsons)
- real_lock.Lock()
- realNumber++
- real_lock.Unlock()
- continue
- }
- }
- }
- }
- //Similar to v1
- func sendOut_cover(list <-chan string, c net.Conn, cover *megred) {
- defer finalSend_wg_cov.Done()
- time.Sleep(time.Millisecond * 100)
- for {
- v, ok := <-list
- if !ok {
- break
- } else {
- if wantToSendHash[v] == 1 {
- continue
- } else {
- status_v := Status[v]
- switch status_v {
- case 1:
- learning_msg_number_wg.Lock()
- learning_msg_number++
- learning_msg_number_wg.Unlock()
- }
- cov_mid := *cover
- cov_mid.Megred_User_Id = v
- cov_mid.Cover_Msg = "yes"
- cov_mid.Megred_Hashtags = []string{"[cover]"}
- cov_mid.Megred_Ip = IpList[v]
- cov_mid.Megred_Timestamp = []string{"000000000"}
- jsons, err_json := json.Marshal(cov_mid)
- if err_json != nil {
- fmt.Println("err_json=", err_json)
- }
- total_length := packetLength
- padding_length := total_length - len(jsons)
- pd := make([]byte, padding_length)
- jsons = append(jsons, pd...)
- if len(jsons) != packetLength {
- fmt.Println("err,cover=", jsons)
- }
- c.Write(jsons)
- cov_lock.Lock()
- covNumber++
- cov_lock.Unlock()
- }
- }
- }
- }
- //same with v1
- func roundEnd(c net.Conn) {
- x := []byte("roundEnd")
- total_length := packetLength
- padding_length := total_length - len(x)
- pd := make([]byte, padding_length)
- x = append(x, pd...)
- if len(x) != packetLength {
- fmt.Println("err,x=", x)
- }
- c.Write(x)
- }
- var finalSend_wg_cov sync.WaitGroup
- var finalSend_wg_real sync.WaitGroup
- //similar with v1, but from scheduler queue
- func finalSend(c net.Conn) {
- //创建cover msg
- cover := &megred{
- []string{},
- []string{},
- "",
- "cover",
- "yes",
- }
- real_msg_channel = make(chan string)
- cover_msg_channel = make(chan string)
- for i := 0; i < 1000; i++ {
- finalSend_wg_real.Add(1)
- finalSend_wg_cov.Add(1)
- go sendOut_real(real_msg_channel, c, cover)
- go sendOut_cover(cover_msg_channel, c, cover)
- }
- putWantToSendHashIntoChannel(real_msg_channel)
- putStatus_learningIntoChannel(cover_msg_channel)
- close(real_msg_channel)
- close(cover_msg_channel)
- finalSend_wg_real.Wait()
- finalSend_wg_cov.Wait()
- time.Sleep(time.Millisecond * 100)
- send_online(c)
- roundEnd(c)
- }
- func initDB1_DB2() {
- conn := GetConn()
- conn.Do("select", "1")
- conn.Do("flushdb")
- conn.Do("keys *")
- conn.Do("select", "2")
- conn.Do("flushdb")
- conn.Do("keys *")
- }
- func start() {
- initDB1_DB2()
- times = 0
- IpList = make(map[string]string)
- Status = make(map[string]int)
- NameList = make([]string, 0)
- status_learning = make([]string, 0)
- packetLength = 3000
- k_val = 15
- batch = 0
- int_to_group = make(map[int]*group)
- grouplist = make([]*group, 0)
- total_used_cover = 0
- total_used_delay = 0
- online_users_hash = make(map[string]*users)
- eliminated_users_list = make([]string, 0)
- }
- //same with v1
- func timeAddOneHour(input string) (outpute string) {
- loc, err_loc := time.LoadLocation("Local")
- if err_loc != nil {
- fmt.Println("err_loc=", err_loc)
- }
- the_time, err_the_time := time.ParseInLocation("20060102150405", input, loc)
- if err_the_time != nil {
- fmt.Println("err_the_time=", err_the_time)
- }
- h, _ := time.ParseDuration("1h")
- h1 := the_time.Add(1 * h)
- //h, _ := time.ParseDuration("1m")
- //h1 := the_time.Add(30 * h)
- //h, _ := time.ParseDuration("1h")
- //h1 := the_time.Add(2 * h)
- timeString := h1.Format("20060102150405")
- outpute = timeString
- return
- }
- var learning_phase_start_round int
- var round int
- //same with v1
- func StatusChangeOrNot_arrival_to_learning() {
- defer log.Println("StatusChangeOrNot_arrival_to_learning() ends")
- fmt.Println("StatusChangeOrNot_arrival_to_learning() starts")
- listener, err := net.Listen("tcp", "192.168.32.144:20002")
- if err != nil {
- fmt.Println("listen 192.168.32.144:20002 failed", err)
- return
- }
- conn, err := listener.Accept()
- if err != nil {
- fmt.Println("conn err", err)
- return
- }
- var tmp [10000]byte
- res := ""
- for {
- n, err := conn.Read(tmp[:])
- if err != nil {
- fmt.Println("read from conn failed", err)
- break
- }
- if string(tmp[:8]) == "noChange" {
- log.Println("no changed status")
- listener.Close()
- return
- } else {
- result := string(tmp[:n])
- res = res + result
- if string(tmp[:9]) == "changeEnd" {
- listener.Close()
- break
- }
- }
- }
- res = strings.Replace(res, " ", "", -1)
- res = strings.Replace(res, "\n", "", -1)
- str_arr := strings.Split(res, ",")
- learning_phase_start_round = round
- for _, str := range str_arr {
- Status[str] = 1
- status_learning = append(status_learning, str)
- }
- fmt.Println("status_learning length=", len(status_learning))
- }
- //same with v1
- func StatusChangeOrNot_eliminated() {
- defer fmt.Println("StatusChangeOrNot_eliminated ends")
- log.Println("StatusChangeOrNot_eliminated() starts")
- listener, err := net.Listen("tcp", "192.168.32.144:20002")
- if err != nil {
- fmt.Println("StatusChangeOrNot_eliminated() listen 192.168.32.144:20002 failed", err)
- return
- }
- conn, err := listener.Accept()
- if err != nil {
- fmt.Println("conn err", err)
- return
- }
- var tmp [10000]byte
- res := ""
- for {
- n, err := conn.Read(tmp[:])
- if err != nil {
- fmt.Println("read from conn failed", err)
- break
- }
- if string(tmp[:8]) == "noChange" {
- fmt.Println("no changed status")
- listener.Close()
- return
- } else {
- result := string(tmp[:n])
- res = res + result
- if string(tmp[:9]) == "changeEnd" {
- listener.Close()
- break
- }
- }
- }
- res = strings.Replace(res, " ", "", -1)
- res = strings.Replace(res, "\n", "", -1)
- str_arr := strings.Split(res, ",")
- for _, str := range str_arr {
- Status[str] = 3
- online_users_hash_lock.Lock()
- middle_u := online_users_hash[str]
- online_users_hash_lock.Unlock()
- delete(online_users_hash, str)
- if middle_u == nil {
- continue
- }
- if middle_u.Group.Group_id%k_val == k_val-2 {
- continue
- }
- count_cover_delay_lock.Lock()
- total_used_cover = total_used_cover + middle_u.Used_cover
- total_used_delay = total_used_delay + middle_u.Used_delay
- count_cover_delay_lock.Unlock()
- }
- log.Println("eliminated users number", len(str_arr))
- }
- //same with v1
- func refresh_status_learning() {
- if round-learning_phase_start_round == 24 {
- fmt.Println("进入refresh_status_learning")
- fmt.Println("len(status_learning)=", len(status_learning))
- status_learning = make([]string, 0)
- }
- }
- //same with v1
- func StatusChangeOrNot_learning_to_online() {
- defer log.Println("StatusChangeOrNot_learning_to_online() ends")
- log.Println("StatusChangeOrNot_learning_to_online() starts")
- listener, err := net.Listen("tcp", "192.168.32.144:20002")
- if err != nil {
- fmt.Println("StatusChangeOrNot_learning_to_online() listen 192.168.32.144:20002 failed", err)
- return
- }
- conn, err := listener.Accept()
- if err != nil {
- fmt.Println("accept failed", err)
- return
- }
- var tmp [10000]byte
- res := ""
- for {
- n, err := conn.Read(tmp[:])
- if err != nil {
- fmt.Println("read from conn failed", err)
- break
- }
- if string(tmp[:8]) == "noChange" {
- log.Println("no changed status")
- listener.Close()
- return
- } else {
- result := string(tmp[:n])
- res = res + result
- if string(tmp[:9]) == "changeEnd" {
- listener.Close()
- break
- }
- }
- }
- res = strings.Replace(res, " ", "", -1)
- res = strings.Replace(res, "\n", "", -1)
- str_arr := strings.Split(res, ",")
- number := 0
- for i := 0; i < len(str_arr)-1; i++ {
- Status[str_arr[i]] = 2
- number++
- }
- fmt.Println("changed status numbers=", number)
- pattern()
- batch = batch + 1
- fmt.Println("someone has changed")
- }
- func double(value float64) float64 {
- value, _ = strconv.ParseFloat(fmt.Sprintf("%.2f", value), 64)
- return value
- }
- func main() {
- //log
- logFile, err := os.OpenFile("log.txt", os.O_CREATE|os.O_APPEND|os.O_RDWR, 0666)
- if err != nil {
- panic(err)
- }
- defer logFile.Close()
- mw := io.MultiWriter(os.Stdout, logFile)
- log.SetOutput(mw)
- start()
- channelSize = 150000
- //1st test point
- lastTimesIndex := 0
- roundEndTime := "20121101130000"
- //2ed test point
- //lastTimesIndex := 1479975
- //roundEndTime := "20121110130000"
- //3rt test point
- //lastTimesIndex := 3751916
- //roundEndTime := "20121120130000"
- for i := 0; i < 35; i++ {
- log.Println("------------------------------------------------------------the", i, "th round starts------------------------------------------------------------------")
- c := getHttpConnection()
- allow_hash = make(map[string]bool)
- arrival_msg_number = 0
- learning_msg_number = 0
- online_msg_number = 0
- rejoin_number = 0
- round = i
- covNumber = 0
- realNumber = 0
- lastTimesIndex = test2(roundEndTime, lastTimesIndex)
- fmt.Println("lastTimesIndex=", lastTimesIndex)
- roundEndTime = timeAddOneHour(roundEndTime)
- fmt.Println("roundEndTime=", roundEndTime)
- fmt.Println("status length", len(Status), "NameList length", len(NameList), "status_learning length", len(status_learning))
- before_finalsend()
- finalSend(c)
- refresh_status_learning()
- fmt.Println("Status length", len(Status))
- StatusChangeOrNot_arrival_to_learning()
- StatusChangeOrNot_learning_to_online()
- StatusChangeOrNot_eliminated()
- fmt.Println("covNumber=", covNumber)
- fmt.Println("realNumber=", realNumber)
- fmt.Println("arrival_msg_number=", arrival_msg_number, "learning_msg_number=", learning_msg_number, "online_msg_number=", online_msg_number)
- log.Println("rejoin_number", rejoin_number)
- log.Println("------------------------------------------------------------the", i, "th round ends------------------------------------------------------------------")
- }
- log.Println("total_used_cover,total_used_delay=", total_used_cover, total_used_delay)
- used_delay_in_each_group := make([]int, 0)
- used_cover_in_each_group := make([]int, 0)
- queue_length := make([]int, 0)
- users_number := make([]int, 0)
- total_user_number := 0
- for _, v := range grouplist {
- a := 0
- b := 0
- c := 0
- for _, m := range v.Users_list {
- a = a + m.Used_delay
- b = b + m.Used_cover
- c = c + len(m.Queue)
- }
- used_delay_in_each_group = append(used_delay_in_each_group, a)
- used_cover_in_each_group = append(used_cover_in_each_group, b)
- queue_length = append(queue_length, c)
- users_number = append(users_number, len(v.Users_list))
- }
- log.Println("users_number", users_number)
- log.Println("used_delay_in_each_group", used_delay_in_each_group)
- log.Println("used_cover_in_each_group", used_cover_in_each_group)
- sum := 0
- for _, v := range used_cover_in_each_group {
- sum = sum + v
- }
- log.Println("cover sum=", sum)
- log.Println("len(queue)", queue_length)
- ave_delay := make([]float64, 0)
- ave_cover := make([]float64, 0)
- ave_queue_length := make([]float64, 0)
- total_delay := float64(0)
- total_cover := float64(0)
- total_queue := float64(0)
- for i := range grouplist {
- total_delay = total_delay + float64(used_delay_in_each_group[i])
- total_cover = total_cover + float64(used_cover_in_each_group[i])
- total_queue = total_queue + float64(queue_length[i])
- }
- for i := range grouplist {
- a := double(float64(used_delay_in_each_group[i]) / float64(users_number[i]))
- b := double(float64(used_cover_in_each_group[i]) / float64(users_number[i]))
- c := double(float64(queue_length[i]) / float64(users_number[i]))
- ave_delay = append(ave_delay, a)
- ave_cover = append(ave_cover, b)
- ave_queue_length = append(ave_queue_length, c)
- }
- for i, v := range users_number {
- if i != k_val-1 && i != k_val-2 {
- total_user_number = total_user_number + v
- }
- }
- aa := double(total_delay / float64(total_user_number))
- bb := double(total_cover / float64(total_user_number))
- cc := double(total_queue / float64(total_user_number))
- log.Println("average delay,cover,queue are", aa, bb, cc, "eliminated users number", total_user_number)
- }
|