package main import ( "encoding/json" "fmt" "io" "io/ioutil" "log" "math" "net" "os" "os/exec" "strconv" "strings" "sync" "time" // "github.com/go-echarts/go-echarts/v2/charts" // "github.com/go-echarts/go-echarts/v2/opts" // "github.com/go-echarts/go-echarts/v2/types" "github.com/go-echarts/go-echarts/v2/charts" "github.com/go-echarts/go-echarts/v2/opts" "github.com/go-echarts/go-echarts/v2/types" "github.com/gomodule/redigo/redis" ) var channelSize int var connID int type Rec struct { Megred_Timestamp []string Merged_Hashtags []string Megred_User_Id string Megred_Ip string Cover_Msg string } // users in arrival phase; when from arrival to learning, agent need to send their arrival messages to agent var arrival_phase_start_list []int var arrival_phase_number_list []int //each batch new users' number var arrival_phase_number_list_lock sync.Mutex //each batch's length var average_arrival_length []int var Status sync.Map //each user' status var NameList []string //users' uid var NameList_Append_Lock sync.Mutex // var Batch_List []string //users in eahc batch var Batch_List_Append_Lock sync.Mutex var Batch_list_threshold int //batch threshold //used for receive message from clients var putInChannel chan string var receive_wg sync.WaitGroup var peocessConn_wg sync.WaitGroup var uid_channel chan string var uip_channel chan string //db14, used as user queue in arrival phase var db14_agentqueue_key chan string var db14_agentqueue_val chan string var db14_agentqueue_Lock sync.Mutex //db13,used to store users' status during learning phase var db13_learning_key chan string var db13_learning_val chan int //0-cover message;1-real message var db13_learning_Lock sync.Mutex //db12,used as user queue in online phase var db12_online_key chan string var db12_online_val chan string var db12_online_Lock sync.Mutex var online_msg_number int var online_msg_number_empty int var online_msg_in_each_group_in_each_round map[int][]int var group_size_in_each_round map[int][]int var online_msg_in_each_group_in_each_round_middle_wg sync.Mutex var online_msg_in_each_group_in_each_round_middle []int //from uid to group id var user_group_hashmap map[string]int //used in arrival to learning phase var sendDBToReceiver chan string var cleanDBToReceiver_wg sync.WaitGroup var sendDBToReceiver_lock sync.Mutex //count cover and real's number var cov_rec int var real_rec int var cov_lock sync.Mutex var real_lock sync.Mutex var packetLength int var groupSet_Number int var number_group_hashmap map[int]*Group type Group struct { Group_Id int Group_User_List []string Group_Size int Group_Size_copy int Group_Active_Threshold int Group_active bool Group_low_frequence bool Group_start_round int Group_online_sum_length int Group_delay_duration_sum time.Duration Group_outlier_flag bool Group_send_times int Group_sleep_times int } var group_list []*Group //function used to receive messages func receive(c net.Conn) { // defer receive_wg.Done() for { a, ok_return := <-putInChannel if !ok_return { return } mid := []byte(a) length := 0 if a == "" { continue } for i := len(a) - 1; ; i-- { if i < 0 { log.Println("a=", a, "len(a)=", len(a), "len(mid)=", len(mid), "mid=", mid) } if mid[i] == 125 { //125=} length = i break } } mid = mid[:length+1] var stru map[string]interface{} err_unmarshal := json.Unmarshal(mid, &stru) if err_unmarshal != nil { log.Println("json err,err_unmarshal=", err_unmarshal, "len(mid=)", len(mid), "len(a)=", len(a)) } var structure Rec //1,time times, err_time := redis.Strings(stru["Megred_Timestamp"], nil) if err_time != nil { log.Println("err_time", err_time, "a=", a) } structure.Megred_Timestamp = times //2,msg msgs, err_msgs := redis.Strings(stru["Megred_Hashtags"], nil) if err_msgs != nil { log.Println("err_msgs", err_msgs, "符串a", a) } structure.Merged_Hashtags = msgs //3,uid uid, err_uid := redis.String(stru["Megred_User_Id"], nil) if err_uid != nil { log.Println("err_uid", err_uid, "a", a) } structure.Megred_User_Id = uid //4,ip ip, err_ip := redis.String(stru["Megred_Ip"], nil) if err_ip != nil { log.Println("err_ip", err_ip, "a", a) } structure.Megred_Ip = ip //5.cover cov, err_cover := redis.String(stru["Cover_Msg"], nil) if err_cover != nil { log.Println("err_cover", err_cover) break } structure.Cover_Msg = cov //process users according to their status //1,new user,store it in db14 staa, ok := Status.Load(structure.Megred_User_Id) if !ok { if structure.Megred_User_Id == "" { continue } Status.Store(structure.Megred_User_Id, 0) NameList_Append_Lock.Lock() NameList = append(NameList, structure.Megred_User_Id) NameList_Append_Lock.Unlock() Batch_List_Append_Lock.Lock() Batch_List = append(Batch_List, structure.Megred_User_Id) Batch_List_Append_Lock.Unlock() db14_agentqueue_Lock.Lock() db14_agentqueue_key <- structure.Megred_User_Id db14_agentqueue_val <- a db14_agentqueue_Lock.Unlock() arrival_phase_start_list = append(arrival_phase_start_list, round) continue } else { //a,arrival phase if staa == 0 { db14_agentqueue_Lock.Lock() db14_agentqueue_key <- structure.Megred_User_Id db14_agentqueue_val <- a db14_agentqueue_Lock.Unlock() continue } //b, learning phase if staa == 1 { //(1),cover message if structure.Cover_Msg == "yes" { db13_learning_Lock.Lock() db13_learning_key <- structure.Megred_User_Id db13_learning_val <- 0 db13_learning_Lock.Unlock() cov_lock.Lock() cov_rec++ cov_lock.Unlock() } else { //real db13_learning_Lock.Lock() db13_learning_key <- structure.Megred_User_Id db13_learning_val <- 1 db13_learning_Lock.Unlock() real_lock.Lock() real_rec++ real_lock.Unlock() } //(2),directly send it to recipient c.Write(mid) continue } //c,online if staa == 2 { db12_online_Lock.Lock() db12_online_key <- structure.Megred_User_Id db12_online_val <- a online_msg_number++ db12_online_Lock.Unlock() online_msg_in_each_group_in_each_round_middle_wg.Lock() gid := user_group_hashmap[structure.Megred_User_Id] online_msg_in_each_group_in_each_round_middle[gid]++ online_msg_in_each_group_in_each_round_middle_wg.Unlock() continue } } } } //put message into db14 var db14_lpush_wg sync.WaitGroup var db14_lpush_lock sync.Mutex func db14_lpush() { defer db14_lpush_wg.Done() arrival_msg_num := 0 log.Println("db14_rpush()") c_db14, _ := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379)) c_db14.Do("select", 14) for { db14_lpush_lock.Lock() x, ok1 := <-db14_agentqueue_key y, ok2 := <-db14_agentqueue_val db14_lpush_lock.Unlock() if ok1 && ok2 { c_db14.Do("lpush", x, y) arrival_msg_num++ } if (!ok1 || !ok2) && db13_db14_db12_waitForReceive_flag { log.Println("db14_lpush() ends,arrival_msg_num=", arrival_msg_num) return } } } //db13 var db13_rpush_wg sync.WaitGroup var db13_rpush_lock sync.Mutex func db13_rpush() { defer db13_rpush_wg.Done() log.Println("db13_rpush()") real := 0 cov := 0 c_db13, _ := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379)) c_db13.Do("select", 13) for { db13_rpush_lock.Lock() x, ok1 := <-db13_learning_key y, ok2 := <-db13_learning_val db13_rpush_lock.Unlock() if ok1 && ok2 { if y == 0 { cov = cov + 1 } else { real = real + 1 } c_db13.Do("rpush", x, y) } if db13_db14_db12_waitForReceive_flag && (!ok1 || !ok2) { log.Println("db13_rpush()ends,real,cov are", real, cov, "total number=", real+cov) return } } } //db12 var db12_lpush_wg sync.WaitGroup var db12_lpush_lock sync.Mutex var test_list []string func db12_lpush() { defer db12_lpush_wg.Done() online_msg := 0 log.Println("db12_lpush()starts") c_db12, _ := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379)) c_db12.Do("select", 12) for { db12_lpush_lock.Lock() x, ok1 := <-db12_online_key test_list = append(test_list, x) y, ok2 := <-db12_online_val db12_lpush_lock.Unlock() if ok1 && ok2 { c_db12.Do("lpush", x, y) online_msg++ } if db13_db14_db12_waitForReceive_flag && (!ok1 || !ok2) { log.Println("db12_lpush() ends,online_msg", online_msg) return } } } //init db12,13,14 func initDb() { log.Println("initDb()执行完成,已经清空db13,db14") c, c_err := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379)) if c_err != nil { log.Println("连接数据库出错,c_err=", c_err) } c.Do("select", 14) c.Do("Flushdb") c.Do("select", 13) c.Do("Flushdb") c.Do("select", 12) c.Do("Flushdb") } //remake channels and some slices func initChannelAndListAndMap() { log.Println("initChannelAndListAndMap") putInChannel = make(chan string, channelSize) db14_agentqueue_key = make(chan string, channelSize) db14_agentqueue_val = make(chan string, channelSize) db13_learning_key = make(chan string, channelSize) db13_learning_val = make(chan int, channelSize) db12_online_key = make(chan string, channelSize) db12_online_val = make(chan string, channelSize) } var db13_db14_db12_waitForReceive_flag bool //process all receive work func peocessConn(conn net.Conn, connID int) { db13_db14_db12_waitForReceive_flag = false log.Println("peocessConn starts") initChannelAndListAndMap() db14_lpush_wg.Add(1) go db14_lpush() db13_rpush_wg.Add(1) go db13_rpush() db12_lpush_wg.Add(1) go db12_lpush() //connect the recipients c, c_err := net.Dial("tcp", "192.168.32.144:20001") //recipient address if c_err != nil { log.Println("err,dial 192.168.32.144:20001 failed", c_err) } for i := 0; i < 500; i++ { receive_wg.Add(1) go receive(c) } var tmp [3000]byte index := 0 total_msg_number := 0 for { total_msg_number++ _, err := conn.Read(tmp[:]) if err != nil { log.Println("read from conn failed", err) return } if string(tmp[:8]) == "roundEnd" { log.Println("client ends the conn, total_msg_number", total_msg_number) break } //Solve the sticky bag problem if tmp[0] != 123 { var corrext_tmp []byte var corrext_follow []byte var correct_length int for i, v := range tmp[:] { if v == 123 { correct_length = i corrext_follow = make([]byte, correct_length) _, err_follow := conn.Read(corrext_follow[:]) if err_follow != nil { log.Println("read from conn failed,err_follow=", err_follow) return } corrext_tmp = append(tmp[i:], corrext_follow[:]...) if len(corrext_tmp) != packetLength { log.Println("corrext_tmp length!=2000", len(corrext_tmp)) } break } } putInChannel <- string(corrext_tmp[:]) index++ log.Println("total_msg_number=", total_msg_number) continue } putInChannel <- string(tmp[:]) } close(putInChannel) receive_wg.Wait() //Determine if each anonymity set is online if len(group_list) > 0 { for _, v := range group_list { online_msg_in_each_group_in_each_round[v.Group_Id] = append(online_msg_in_each_group_in_each_round[v.Group_Id], online_msg_in_each_group_in_each_round_middle[v.Group_Id]) number := float64(0) if v.Group_outlier_flag { number = float64(group_list[v.Group_Id].Group_Size*group_Threshold_outlier) / float64(100) } else { number = float64(group_list[v.Group_Id].Group_Size*group_list[v.Group_Id].Group_Active_Threshold) / float64(100) } if number >= float64(online_msg_in_each_group_in_each_round_middle[v.Group_Id]) && group_list[v.Group_Id].Group_sleep_times <= Group_sleep_times_threshold { group_list[v.Group_Id].Group_active = false group_list[v.Group_Id].Group_sleep_times++ } else { group_list[v.Group_Id].Group_active = true group_list[v.Group_Id].Group_sleep_times = 0 } } } db13_db14_db12_waitForReceive_flag = true close(db14_agentqueue_key) close(db14_agentqueue_val) close(db13_learning_key) close(db13_learning_val) close(db12_online_key) close(db12_online_val) db13_rpush_wg.Wait() db14_lpush_wg.Wait() db12_lpush_wg.Wait() log.Println("peocessConn ends, NameList length=", len(NameList), "Batch_Listlength =", len(Batch_List)) } //send all arrival messages to recipients func cleanDBToReceiver(httpConn net.Conn) { defer cleanDBToReceiver_wg.Done() time.Sleep(time.Millisecond * 300) c, _ := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379)) c.Do("select", 14) for { x, ok := <-sendDBToReceiver if ok { sendDBToReceiver_lock.Lock() queueLen, _ := c.Do("llen", x) ql, err_ql := redis.Int(queueLen, nil) if err_ql != nil { log.Println("db14,err_ql=", err_ql) } for i := 0; i < ql; i++ { result, err_rpop := c.Do("rpop", x) if err_rpop != nil { log.Println("db14,err_rpop=", err_rpop) } res, err_res := redis.String(result, nil) if err_res != nil { log.Println("db14,err_res=", err_res) } jsons, err_jsons := json.Marshal(res) if err_jsons != nil { log.Println("db14, err_jsons=", err_jsons) } httpConn.Write(jsons) } sendDBToReceiver_lock.Unlock() } else { return } } } var learningphase_start_round int var round int var last_Batch_List []string var last_Batch_List_to_online []string var lastLearningPhase_end_flag bool /* 1,users in batch_list,set their status to 1,means they enter the learning phase 2,clean db14中,send arrival message to recipients 3,tell clients,which users have changed their status 4,clean up batch-list 5, */ func arrivalToLearn1() bool { log.Println("arrivalToLearn1()") time.Sleep(time.Millisecond * 300) //enough new users if len(Batch_List) >= Batch_list_threshold { if !lastLearningPhase_end_flag { log.Println("last learning phase still not ends, must wait") httpConn_client, err1 := net.Dial("tcp", "192.168.32.144:20002") if err1 != nil { log.Println("err,dial 192.168.32.144:20002 failed", err1) } sendToCli := []byte("noChange") httpConn_client.Write(sendToCli) log.Println("arrivalToLearn1() ends, no one changes status") return false } //they can enter learning phase log.Println("before enter learning phase, last learningphase_start_round=", learningphase_start_round, "round=", round) learningphase_start_round = round log.Println("after enter learning phase, new learningphase_start_round=", learningphase_start_round, "round=", round) last_Batch_List = make([]string, len(Batch_List)) copy(last_Batch_List, Batch_List) log.Println("len(Batch_List)=", len(Batch_List)) //A:send arrival messages to recipients //a1,conn the recipient httpConn_receiver, err2 := net.Dial("tcp", "192.168.32.144:20001") //recipients address if err2 != nil { log.Println("receiver,dial 192.168.32.144:20001 failed", err2) } //a2,init sendDBToReceiver' channel to store uid in batch list sendDBToReceiver = make(chan string, channelSize) //a3,clean ip db14 for i := 0; i < 100; i++ { cleanDBToReceiver_wg.Add(1) go cleanDBToReceiver(httpConn_receiver) } middle_sum_number := 0 for _, v := range arrival_phase_start_list { middle_sum_number = middle_sum_number + round - v } arrival_phase_number_list_lock.Lock() average_arrival_length = append(average_arrival_length, middle_sum_number/len(arrival_phase_start_list)) arrival_phase_number_list = append(arrival_phase_number_list, len(arrival_phase_start_list)) arrival_phase_number_list_lock.Unlock() arrival_phase_start_list = make([]int, 0) //B,sned feedback to clients httpConn_client, err1 := net.Dial("tcp", "192.168.32.144:20002") //client's address if err1 != nil { log.Println("err dial 192.168.32.144:20002 failed", err1) } sendToClient := "" for i, v := range Batch_List { sendDBToReceiver <- v Status.Store(v, 1) sendToClient = sendToClient + v + "," if (i%6000 == 0 && i != 0) || i == len(Batch_List)-1 { sendToCli := []byte(sendToClient) httpConn_client.Write(sendToCli) sendToClient = "" } } log.Println("Batch_List length", len(Batch_List)) close(sendDBToReceiver) time.Sleep(time.Second * 2) httpConn_client.Write([]byte("changeEnd")) //C,clean Batch-list Batch_List = make([]string, 0) cleanDBToReceiver_wg.Wait() c, c_err := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379)) if c_err != nil { log.Println("err,c_err=", c_err) } c.Do("select", 14) c.Do("flushdb") log.Println("len(Batch_List)=", len(Batch_List)) log.Println("arrivalToLearn1() ends, someone has changed his status") lastLearningPhase_end_flag = false return true } else { //cannot enter learning phase httpConn_client, err1 := net.Dial("tcp", "192.168.32.144:20002") if err1 != nil { log.Println("err,dial 192.168.32.144:20002 failed", err1) } sendToCli := []byte("noChange") httpConn_client.Write(sendToCli) log.Println("arrivalToLearn1() ends,no one has changed his status") return false } } //determines the length of the learnin phase; could be 24/48/72 rounds func refresh_learningPhase_flag() bool { defer log.Println("refresh_learningPhase_flag ends") log.Println("refresh_learningPhase_flag starts") if round-learningphase_start_round == 24 { //24//48//72 log.Println("round=", round, "learningphase_start_round=", learningphase_start_round) lastLearningPhase_end_flag = true //In the test only the first batch of users are tested, subsequent batches of users are not allowed to enter the learning phase if round >= 33 { //32//46//80//82,72 rounds lastLearningPhase_end_flag = false return false } last_Batch_List_to_online = make([]string, len(last_Batch_List)) copy(last_Batch_List_to_online, last_Batch_List) return true } return false } //from learning to online func learning_To_online_batchList(flag bool) { //a,arrival to learning doesn't happen log.Println("learning_To_online starts,flag=", flag) time.Sleep(time.Second * 1) if !flag { httpConn_client, err1 := net.Dial("tcp", "192.168.32.144:20002") if err1 != nil { log.Println("err, dial 192.168.32.144:20002 failed", err1) } sendToCli := []byte("noChange") httpConn_client.Write(sendToCli) log.Println("learning_To_online,no one from learning to online") return } else { //b,it happends httpConn_client, err1 := net.Dial("tcp", "192.168.32.144:20002") //client's address if err1 != nil { log.Println("err dial 192.168.32.144:20002 failed", err1) } sendToClient := "" for i, v := range last_Batch_List_to_online { //1.status set to 2 Status.Store(v, 2) sendToClient = sendToClient + v + "," //2,tell clients, which users have been online if (i%6000 == 0 && i != 0) || i == len(last_Batch_List_to_online)-1 { sendToCli := []byte(sendToClient) httpConn_client.Write(sendToCli) sendToClient = "" } } time.Sleep(time.Second * 1) httpConn_client.Write([]byte("changeEnd")) } } //learning, do clustering, create anonynity set func learning_function(flag bool) { defer log.Println("learning_function ends") log.Println("start learning_function") if flag { log.Println("round=", round, "learningphase_start_round=", learningphase_start_round) /* 1,taken data from db13, use output.sh,clean db13 2,do clustering, store result ingroupresult 3,create anonymity set and record the users in each set */ //1,output.sh; // /home/it/middle_data/rediskey.txt val value.csv _, err := exec.Command("/bin/bash", "/home/it/middle_data/output.sh").CombinedOutput() if err != nil { log.Println("err,cmd.Output:", err) return } log.Println("output.sh ends") log.Println("do clustering") starttime := time.Now() cmd, err_cmd := exec.Command("python3", "/home/it/learning-phase/final.py").CombinedOutput() if err_cmd != nil { log.Println("err,err_cmd=", err_cmd) return } log.Println("result", string(cmd)) // count time duration cost := int(time.Since(starttime) / time.Second) duration = cost //2,combine.sh _, err_combine := exec.Command("/bin/bash", "/home/it/middle_data/combine.sh").CombinedOutput() if err_combine != nil { log.Println("err,cmd.Output:", err_combine) return } log.Println("combine.sh ends") filePath1 := "/home/it/middle_data/filtered_good_name_list.txt" content1, err1 := ioutil.ReadFile(filePath1) if err1 != nil { panic(err1) } //m1 is key,uid m1 := strings.Split(string(content1), "\n") filePath2 := "/home/it/middle_data/groupingResult.txt" content2, err2 := ioutil.ReadFile(filePath2) if err2 != nil { panic(err1) } //m2 is the clustering result, m2 := strings.Split(string(content2), "\n") log.Println("m1 length=", len(m1), "m2 length=", len(m2)) //create anonynmity set for i := 0; i < k_val; i++ { var g Group g.Group_Id = i + k_val*groupSet_Number g.Group_Active_Threshold = group_Threshold_global g.Group_Size = 0 g.Group_Size_copy = 0 g.Group_active = false g.Group_User_List = make([]string, 0) g.Group_start_round = round g.Group_online_sum_length = 0 g.Group_delay_duration_sum = time.Duration(0) g.Group_send_times = 0 g.Group_sleep_times = 0 //low frequence users if i == k_val-1 { g.Group_low_frequence = true } else { g.Group_low_frequence = false } //outliers if i == k_val-2 { g.Group_outlier_flag = true } else { g.Group_outlier_flag = false } online_msg_in_each_group_in_each_round[g.Group_Id] = make([]int, 0) group_size_in_each_round[g.Group_Id] = make([]int, 0) group_list = append(group_list, &g) number_group_hashmap[i+k_val*groupSet_Number] = &g } for i := 0; i < len(m1)-1; i++ { if i < len(m2)-2 { intm2, err_m2 := strconv.Atoi(m2[i]) if err_m2 != nil { log.Println("err_m2=", err_m2) return } user_group_hashmap[m1[i]] = intm2 + groupSet_Number*k_val middle_g, ok_middle_g := number_group_hashmap[intm2+groupSet_Number*k_val] if !ok_middle_g { log.Println("err,group id=", intm2+groupSet_Number*k_val) } else { middle_g.Group_Size++ middle_g.Group_User_List = append(middle_g.Group_User_List, m1[i]) } } else { user_group_hashmap[m1[i]] = (groupSet_Number+1)*k_val - 1 middle_g, ok_middle_g := number_group_hashmap[(groupSet_Number+1)*k_val-1] if !ok_middle_g { log.Println("err,group id=", (groupSet_Number+1)*k_val-1) return } else { middle_g.Group_Size++ middle_g.Group_User_List = append(middle_g.Group_User_List, m1[i]) } } } sum := 0 for i := 0; i < k_val; i++ { gg := number_group_hashmap[i+k_val*groupSet_Number] log.Println("id=", gg.Group_Id, "i+ k_val*groupSet_Number=", i+k_val*groupSet_Number, "group,size=", gg.Group_Size) sum = sum + gg.Group_Size } log.Println("groupSet_Number=", groupSet_Number) groupSet_Number++ log.Println("update groupSet_Number,groupSet_Number=", groupSet_Number) return } else { return } } var group_active_function_db12_lock sync.Mutex var group_active_function_db12_rpop_lock sync.Mutex var eliminated_user_channel chan string var active_user_channel chan string var k_val int var group_Threshold_global int var group_Threshold_outlier int var delay_limitation int var outlier_delay_limitation int var Group_sleep_times_threshold int //process each group in each round, if it is active,eliminate the offline users func group_active_function(gg *Group, list_1 []string, list_2 []string) { defer log.Println("group_id=", gg.Group_Id, "group_active_function ends,Group_Size=", gg.Group_Size) log.Println("group_id=", gg.Group_Id, "group_active_function ends, Group_Size", gg.Group_Size) c_active, err_active := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379)) if err_active != nil { log.Println("err,err_active=", err_active) return } //1 offline users gg.Group_online_sum_length = gg.Group_online_sum_length + len(list_2)*(round-gg.Group_start_round) gg.Group_Size_copy = gg.Group_Size_copy + len(list_2) for _, v := range list_2 { x, load_ok := Status.Load(v) if load_ok { if x != 2 { err_result = append(err_result, v) } } Status.Delete(v) eliminated_user_channel <- v c_active.Do("select", 12) group_active_function_db12_lock.Lock() _, err_del := c_active.Do("del", v) group_active_function_db12_lock.Unlock() if err_del != nil { log.Println("err_del=", err_del) return } } //2 online users gg.Group_Size = len(list_1) gg.Group_User_List = list_1 c_active.Do("select", 12) for _, v := range list_1 { x, load_ok := Status.Load(v) if load_ok { if x != 2 { err_result = append(err_result, v) } } group_active_function_db12_rpop_lock.Lock() result, err_rpop := c_active.Do("rpop", v) group_active_function_db12_rpop_lock.Unlock() if err_rpop != nil { log.Println("err_rpop=", err_rpop) } res, err_string := redis.String(result, nil) if err_string != nil { log.Println("err_string=", err_string) } active_user_channel <- res mid := []byte(res) length := 0 for i := len(res) - 1; ; i-- { if i < 0 { log.Println("err,res=", res, "len(res)=", len(res), "len(mid)=", len(mid), "mid=", mid) } if mid[i] == 125 { //125=} length = i break } } mid = mid[:length+1] gg.Group_send_times = gg.Group_send_times + 1 var stru map[string]interface{} err_unmarshal := json.Unmarshal(mid, &stru) if err_unmarshal != nil { log.Println("err_unmarshal=", err_unmarshal, "len(mid=)", len(mid)) } //time times, err_time := redis.Strings(stru["Megred_Timestamp"], nil) if err_time != nil { log.Println("err_time", err_time, "res=", res) } msg_time := times[0] loc, err_loc := time.LoadLocation("Local") if err_loc != nil { fmt.Println("err_loc=", err_loc) } //msg time the_time_of_msg, err_the_time_msg := time.ParseInLocation("20060102150405", msg_time, loc) if err_the_time_msg != nil { fmt.Println("err_the_time_msg=", err_the_time_msg) } the_time_of_agent, err_the_time_agent := time.ParseInLocation("20060102150405", "20121101120000", loc) if err_the_time_agent != nil { fmt.Println("err_the_time_agent=", err_the_time_agent) } send_out_timepoint := the_time_of_agent.Add(time.Duration(round+1) * time.Hour) duration := send_out_timepoint.Sub(the_time_of_msg) gg.Group_delay_duration_sum = gg.Group_delay_duration_sum + duration log.Println("group id=", gg.Group_Id, "the_time_of_msg=", the_time_of_msg, "send_out_timepoint=", send_out_timepoint, "duration=", duration) } } var each_group_process_lock sync.Mutex var each_group_process_wg sync.WaitGroup func each_group_process(g *Group) { defer each_group_process_wg.Done() c_db12, err_db12 := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379)) if err_db12 != nil { log.Println("err_db12=", err_db12) return } c_db12.Do("select", 12) middle_active_list := make([]string, 0) middle_inactive_list := make([]string, 0) if g.Group_Size == 1 { middle_inactive_list = append(middle_inactive_list, g.Group_User_List[0]) } else { for _, v := range g.Group_User_List { each_group_process_lock.Lock() agent_queue_length, err_agent_queue_length := c_db12.Do("llen", v) each_group_process_lock.Unlock() if err_agent_queue_length != nil { log.Println("err_agent_queue_length=", err_agent_queue_length) return } length, err_length := redis.Int(agent_queue_length, nil) if err_length != nil { log.Println("err_length=", err_length) return } if g.Group_outlier_flag { //outliers if length == 0 || length > outlier_delay_limitation { middle_inactive_list = append(middle_inactive_list, v) } else { middle_active_list = append(middle_active_list, v) } } else { //normal users if length == 0 || length > delay_limitation { middle_inactive_list = append(middle_inactive_list, v) } else { middle_active_list = append(middle_active_list, v) } } } } number := float64(0) //for outliers if g.Group_outlier_flag == true { number = float64(g.Group_Size*g.Group_Active_Threshold) / float64(100) } if g.Group_active { log.Println("group_id=", g.Group_Id, "threshold number=", number, "active user number", len(middle_active_list)) group_active_function(g, middle_active_list, middle_inactive_list) } else { log.Println("group_id=", g.Group_Id, "threshold number=", number, "active user number", len(middle_active_list)) } log.Println("group_id=", g.Group_Id, "is", g.Group_active, "active user number=", len(middle_active_list), "inactive user number=", len(middle_inactive_list), "group_size=", g.Group_Size, "Group_User_List length=", len(g.Group_User_List)) } var db12_send_to_receiver_wg sync.WaitGroup var active_user_channel_closed bool //send online message to recipient func db12_send_to_receiver(c net.Conn) { defer db12_send_to_receiver_wg.Done() for { x, ok := <-active_user_channel if ok { jsons, err_jsons := json.Marshal(x) if err_jsons != nil { log.Println("db12 err_jsons=", err_jsons) } c.Write(jsons) } else { if active_user_channel_closed { return } else { continue } } } } var eliminated_user_channel_closed bool var online_eliminated_batchList_wg sync.WaitGroup //send the eliminated users namelist to client func online_eliminated_batchList() { defer online_eliminated_batchList_wg.Done() log.Println("online_eliminated_batchList starts") time.Sleep(time.Second * 2) httpConn_client, err1 := net.Dial("tcp", "192.168.32.144:20002") //client address if err1 != nil { log.Println("conn client err, dial 192.168.32.144:20002 failed", err1) } number := 0 eliminated_list := make([]string, 0) for { v, ok := <-eliminated_user_channel if ok { number++ eliminated_list = append(eliminated_list, v) } else { if eliminated_user_channel_closed { break } else { continue } } } //someone is eliminated if len(eliminated_list) > 0 { sendToClient := "" for i, v := range eliminated_list { sendToClient = sendToClient + v + "," if (i%6000 == 0 && i != 0) || i == len(eliminated_list)-1 { sendToCli := []byte(sendToClient) httpConn_client.Write(sendToCli) sendToClient = "" } } time.Sleep(time.Second * 2) httpConn_client.Write([]byte("changeEnd")) } else { httpConn_client.Write([]byte("noChange")) } log.Println("eliminated user number=", number) } //send online message to recipient func online_cache_process() { c, c_err := net.Dial("tcp", "192.168.32.144:20001") //recipient log.Println("start online_cache_process") if c_err != nil { log.Println("online msg db12 err, dial 192.168.32.144:20001 failed", c_err) } active_user_channel_closed = false eliminated_user_channel_closed = false for i := 0; i < 10; i++ { db12_send_to_receiver_wg.Add(1) go db12_send_to_receiver(c) } online_eliminated_batchList_wg.Add(1) go online_eliminated_batchList() log.Println("group_list length", len(group_list)) for _, v := range group_list { each_group_process_wg.Add(1) each_group_process(v) } each_group_process_wg.Wait() close(active_user_channel) active_user_channel_closed = true close(eliminated_user_channel) eliminated_user_channel_closed = true db12_send_to_receiver_wg.Wait() online_eliminated_batchList_wg.Wait() } func record_group_size() { for _, v := range group_list { group_size_in_each_round[v.Group_Id] = append(group_size_in_each_round[v.Group_Id], v.Group_Size) } } func initMetric() { log.Println("initMetric() starts") Batch_list_threshold = 10000 channelSize = Batch_list_threshold + 10000 round = 0 lastLearningPhase_end_flag = true packetLength = 3000 k_val = 14 group_Threshold_global = 30 group_Threshold_outlier = 30 delay_limitation = 2 outlier_delay_limitation = 2 Group_sleep_times_threshold = 5 } //init some channels and parameters func initNameListAndBatchList() { log.Println("initNameListAndBatchList() starts") Batch_List = make([]string, 0) NameList = make([]string, 0) number_group_hashmap = make(map[int]*Group) groupSet_Number = 0 group_list = make([]*Group, 0) online_msg_in_each_group_in_each_round = make(map[int][]int) user_group_hashmap = make(map[string]int) group_size_in_each_round = make(map[int][]int) err_result = make([]string, 0) arrival_phase_start_list = make([]int, 0) average_arrival_length = make([]int, 0) arrival_phase_number_list = make([]int, 0) } func generateLineItems(result []int) []opts.LineData { items := make([]opts.LineData, 0) for i := 0; i < len(result); i++ { items = append(items, opts.LineData{Value: result[i]}) } return items } var err_result []string var duration int func main() { //log logFile, err := os.OpenFile("log.txt", os.O_CREATE|os.O_APPEND|os.O_RDWR, 0666) if err != nil { panic(err) } defer logFile.Close() mw := io.MultiWriter(os.Stdout, logFile) log.SetOutput(mw) connID = 0 var connID int //1,listen to client's request listener, err := net.Listen("tcp", "192.168.32.144:20000") if err != nil { log.Println("start tcp server 192.168.32.144:20000 failed", err) return } initNameListAndBatchList() initMetric() initDb() for i := 0; i < 35; i++ { log.Println("--------------------------------------------------the", i, "th round starts----------------------------------------------") eliminated_user_channel = make(chan string, 100000) active_user_channel = make(chan string, 100000) online_msg_number = 0 online_msg_number_empty = 0 round = i cov_rec = 0 real_rec = 0 conn, err := listener.Accept() if err != nil { log.Println("accept failed", err) return } connID = connID + 1 log.Println("connID", connID) peocessConn(conn, connID) h24 := refresh_learningPhase_flag() arrivalToLearn1() learning_To_online_batchList(h24) learning_function(h24) online_msg_in_each_group_in_each_round_middle = make([]int, k_val*(groupSet_Number)) online_cache_process() log.Println("online msg number=", online_msg_number, "online_msg_number_empty=", online_msg_number_empty) record_group_size() log.Println("NameList length", len(NameList)) log.Println("ov_rec,real_rec numbers ", cov_rec, real_rec) log.Println("---------------------------------------------------next round---------------------------------------------------") } x_zhou := make([]int, len(group_list)) for i := 0; i < len(group_list); i++ { x_zhou[i] = i } ave_groupsize := make([]int, len(group_size_in_each_round[0])) for i, v := range group_list { fmt.Println(i, v) line := charts.NewLine() line.SetGlobalOptions( charts.WithInitializationOpts(opts.Initialization{Theme: types.ThemeWesteros}), charts.WithTitleOpts(opts.Title{ Title: ("group_id=" + strconv.Itoa(v.Group_Id)), Subtitle: "k_val=" + strconv.Itoa(k_val) + ";delay_limitation=" + strconv.Itoa(delay_limitation) + ";Group_Active_Threshold=" + strconv.Itoa(v.Group_Active_Threshold), })) log.Println("group_id=", i, "group,group_size changes as", group_size_in_each_round[v.Group_Id]) //average group size if !v.Group_low_frequence { for i, v := range group_size_in_each_round[v.Group_Id] { if v == 0 { ave_groupsize[i] = ave_groupsize[i] + 1 } else { ave_groupsize[i] = ave_groupsize[i] + v } } } log.Println("group_id=", i, "group,in each round,active user numbers are", online_msg_in_each_group_in_each_round[v.Group_Id]) line.SetXAxis(x_zhou). AddSeries("Category A", generateLineItems(online_msg_in_each_group_in_each_round[v.Group_Id])). AddSeries("Category B", generateLineItems(group_size_in_each_round[v.Group_Id])). SetSeriesOptions(charts.WithLineChartOpts(opts.LineChart{Step: true, ConnectNulls: true})) name1 := "group_" name2 := ".html" name := name1 + strconv.Itoa(i) + name2 w, _ := os.Create(name) line.Render(w) } log.Println(err_result) log.Println("k_val=", k_val, "duration=", duration) log.Println("arrival_phase_number, average_arrival_length", arrival_phase_number_list, average_arrival_length) sum_number := 0 sum_length := 0 sum_duration := time.Duration(0) sum_send_times := 0 for _, v := range group_list { if !v.Group_low_frequence { //average arrival length log.Println(v.Group_Id, "Group_size_copy=", v.Group_Size_copy, "online sum length", v.Group_online_sum_length) sum_length = v.Group_online_sum_length + sum_length sum_number = sum_number + v.Group_Size_copy //online msg' latency sum_duration = sum_duration + v.Group_delay_duration_sum sum_send_times = sum_send_times + v.Group_send_times log.Println("group id = ", v.Group_Id, "group,online msg average latency=", v.Group_delay_duration_sum/time.Duration(v.Group_send_times), "Group_outlier_flag", v.Group_outlier_flag) } } log.Println("average online length=", float64(sum_length)/float64(sum_number), "user sum_number", sum_number, "online phase total length", sum_length, "online average latency", sum_duration/time.Duration(sum_send_times)) ave_groupsize_double := make([]float64, len(ave_groupsize)) for i, v := range ave_groupsize { ave_groupsize_double[i] = math.Round((float64(v) / float64(k_val-1))) } log.Println("average group size=", ave_groupsize_double) log.Println("sum of group size=", ave_groupsize) log.Println("delay_limitation=", delay_limitation, "outlier_delay_limitation", outlier_delay_limitation, "group_Threshold_global", group_Threshold_global, "Group_sleep_times_threshold", Group_sleep_times_threshold) }