package main import ( "encoding/json" "fmt" "io" "io/ioutil" "log" "math" "net" "os" "os/exec" "strconv" "strings" "sync" "time" // "github.com/go-echarts/go-echarts/v2/charts" // "github.com/go-echarts/go-echarts/v2/opts" // "github.com/go-echarts/go-echarts/v2/types" "github.com/go-echarts/go-echarts/v2/charts" "github.com/go-echarts/go-echarts/v2/opts" "github.com/go-echarts/go-echarts/v2/types" "github.com/gomodule/redigo/redis" ) var channelSize int var connID int type Rec struct { Megred_Timestamp []string Merged_Hashtags []string Megred_User_Id string Megred_Ip string Cover_Msg string } var arrival_phase_start_list []int var arrival_phase_start_list_lock sync.Mutex var arrival_phase_number_list []int var arrival_phase_number_list_lock sync.Mutex var average_arrival_length []int var Status sync.Map var NameList []string var NameList_Append_Lock sync.Mutex var Batch_List []string var Batch_List_Append_Lock sync.Mutex var Batch_list_threshold int var putInChannel chan string var receive_wg sync.WaitGroup var peocessConn_wg sync.WaitGroup var uid_channel chan string var uip_channel chan string var db14_agentqueue_key chan string var db14_agentqueue_val chan string var db14_agentqueue_Lock sync.Mutex var db13_learning_key chan string var db13_learning_val chan int var db13_learning_Lock sync.Mutex var db12_online_key chan string var db12_online_val chan string var db12_online_Lock sync.Mutex var online_msg_number int var online_msg_number_empty int var online_msg_in_each_group_in_each_round map[int][]int var group_size_in_each_round map[int][]int var online_msg_in_each_group_in_each_round_middle_wg sync.Mutex var online_msg_in_each_group_in_each_round_middle []int var user_group_hashmap map[string]int var sendDBToReceiver chan string var cleanDBToReceiver_wg sync.WaitGroup var sendDBToReceiver_lock sync.Mutex var cov_rec int var real_rec int var cov_lock sync.Mutex var real_lock sync.Mutex var packetLength int var total_cover int var total_cover_lock sync.Mutex var groupSet_Number int var number_group_hashmap map[int]*Group type Group struct { Group_Id int Group_User_List []string Group_Size int Group_Size_copy int Group_Active_Threshold int Group_active bool Group_low_frequence bool Group_start_round int Group_online_sum_length int Group_delay_duration_sum time.Duration Group_outlier_flag bool Group_send_times int Group_sleep_times int Group_cover int Group_delay_in_each_round []time.Duration } var group_list []*Group func receive(c net.Conn) { defer receive_wg.Done() for { a, ok_return := <-putInChannel if !ok_return { return } mid := []byte(a) length := 0 if a == "" { continue } for i := len(a) - 1; ; i-- { if i < 0 { log.Println("err,a=", a, "len(a)=", len(a), "len(mid)=", len(mid), "mid=", mid) } if mid[i] == 125 { //125=} length = i break } } mid = mid[:length+1] var stru map[string]interface{} err_unmarshal := json.Unmarshal(mid, &stru) if err_unmarshal != nil { log.Println("json,err_unmarshal=", err_unmarshal, "len(mid=)", len(mid), "len(a)=", len(a)) } var structure Rec //1,time times, err_time := redis.Strings(stru["Megred_Timestamp"], nil) if err_time != nil { log.Println("err_time", err_time, "a=", a) } structure.Megred_Timestamp = times //2,msg msgs, err_msgs := redis.Strings(stru["Megred_Hashtags"], nil) if err_msgs != nil { log.Println("err_msgs", err_msgs, "a", a) } structure.Merged_Hashtags = msgs //3,uid uid, err_uid := redis.String(stru["Megred_User_Id"], nil) if err_uid != nil { log.Println("err_uid", err_uid, "a", a) } structure.Megred_User_Id = uid //4,ip ip, err_ip := redis.String(stru["Megred_Ip"], nil) if err_ip != nil { log.Println("err_ip", err_ip, "a", a) } structure.Megred_Ip = ip //5.cover cov, err_cover := redis.String(stru["Cover_Msg"], nil) if err_cover != nil { log.Println("err_cover", err_cover) break } structure.Cover_Msg = cov //different status,sifferent db(process workflow) //1,put into db14 staa, ok := Status.Load(structure.Megred_User_Id) if !ok { if structure.Megred_User_Id == "" { continue } Status.Store(structure.Megred_User_Id, 0) NameList_Append_Lock.Lock() NameList = append(NameList, structure.Megred_User_Id) NameList_Append_Lock.Unlock() Batch_List_Append_Lock.Lock() Batch_List = append(Batch_List, structure.Megred_User_Id) Batch_List_Append_Lock.Unlock() db14_agentqueue_Lock.Lock() db14_agentqueue_key <- structure.Megred_User_Id db14_agentqueue_val <- a db14_agentqueue_Lock.Unlock() arrival_phase_start_list_lock.Lock() arrival_phase_start_list = append(arrival_phase_start_list, round) arrival_phase_start_list_lock.Unlock() continue } else { //a,arrival if staa == 0 { db14_agentqueue_Lock.Lock() db14_agentqueue_key <- structure.Megred_User_Id db14_agentqueue_val <- a db14_agentqueue_Lock.Unlock() continue } //b,learning if staa == 1 { if structure.Cover_Msg == "yes" { db13_learning_Lock.Lock() db13_learning_key <- structure.Megred_User_Id db13_learning_val <- 0 db13_learning_Lock.Unlock() cov_lock.Lock() cov_rec++ cov_lock.Unlock() } else { db13_learning_Lock.Lock() db13_learning_key <- structure.Megred_User_Id db13_learning_val <- 1 db13_learning_Lock.Unlock() real_lock.Lock() real_rec++ real_lock.Unlock() } c.Write(mid) continue } //c,online if staa == 2 { db12_online_Lock.Lock() db12_online_key <- structure.Megred_User_Id db12_online_val <- a online_msg_number++ db12_online_Lock.Unlock() online_msg_in_each_group_in_each_round_middle_wg.Lock() gid := user_group_hashmap[structure.Megred_User_Id] online_msg_in_each_group_in_each_round_middle[gid]++ online_msg_in_each_group_in_each_round_middle_wg.Unlock() if structure.Cover_Msg == "yes" { total_cover_lock.Lock() total_cover++ total_cover_lock.Unlock() } continue } } } } //db14,same with v1 var db14_lpush_wg sync.WaitGroup var db14_lpush_lock sync.Mutex func db14_lpush() { defer db14_lpush_wg.Done() arrival_msg_num := 0 log.Println("db14_rpush() starts") c_db14, _ := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379)) c_db14.Do("select", 14) for { db14_lpush_lock.Lock() x, ok1 := <-db14_agentqueue_key y, ok2 := <-db14_agentqueue_val db14_lpush_lock.Unlock() if ok1 && ok2 { c_db14.Do("lpush", x, y) arrival_msg_num++ } if (!ok1 || !ok2) && db13_db14_db12_waitForReceive_flag { log.Println("db14_lpush() ends,arrival_msg_num=", arrival_msg_num) return } } } //db13,same with v1 var db13_rpush_wg sync.WaitGroup var db13_rpush_lock sync.Mutex func db13_rpush() { defer db13_rpush_wg.Done() log.Println("db13_rpush() starts") real := 0 cov := 0 c_db13, _ := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379)) c_db13.Do("select", 13) for { db13_rpush_lock.Lock() x, ok1 := <-db13_learning_key y, ok2 := <-db13_learning_val db13_rpush_lock.Unlock() if ok1 && ok2 { if y == 0 { cov = cov + 1 } else { real = real + 1 } c_db13.Do("rpush", x, y) } if db13_db14_db12_waitForReceive_flag && (!ok1 || !ok2) { log.Println("db13_rpush()ends, real,cov", real, cov, "total numbers", real+cov) return } } } //db12,same with v1 var db12_lpush_wg sync.WaitGroup var db12_lpush_lock sync.Mutex var test_list []string func db12_lpush() { defer db12_lpush_wg.Done() online_msg := 0 log.Println("db12_lpush()starts") c_db12, _ := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379)) c_db12.Do("select", 12) for { db12_lpush_lock.Lock() x, ok1 := <-db12_online_key test_list = append(test_list, x) y, ok2 := <-db12_online_val db12_lpush_lock.Unlock() if ok1 && ok2 { c_db12.Do("lpush", x, y) online_msg++ } if db13_db14_db12_waitForReceive_flag && (!ok1 || !ok2) { log.Println("db12_lpush() ends,online_msg", online_msg) return } } } //same with v1 func initDb() { log.Println("initDb()") c, c_err := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379)) if c_err != nil { log.Println("c_err=", c_err) } c.Do("select", 14) c.Do("Flushdb") c.Do("select", 13) c.Do("Flushdb") c.Do("select", 12) c.Do("Flushdb") } //same with v1 func initChannelAndListAndMap() { log.Println("initChannelAndListAndMap starts") putInChannel = make(chan string, channelSize) db14_agentqueue_key = make(chan string, channelSize) db14_agentqueue_val = make(chan string, channelSize) db13_learning_key = make(chan string, channelSize) db13_learning_val = make(chan int, channelSize) db12_online_key = make(chan string, channelSize) db12_online_val = make(chan string, channelSize) } var db13_db14_db12_waitForReceive_flag bool //same with v1 func peocessConn(conn net.Conn, connID int) { db13_db14_db12_waitForReceive_flag = false log.Println("peocessConn starts") initChannelAndListAndMap() db14_lpush_wg.Add(1) go db14_lpush() db13_rpush_wg.Add(1) go db13_rpush() db12_lpush_wg.Add(1) go db12_lpush() c, c_err := net.Dial("tcp", "192.168.32.144:20001") //recipient address if c_err != nil { log.Println("conn err,dial 192.168.32.144:20001 failed", c_err) } for i := 0; i < 500; i++ { receive_wg.Add(1) go receive(c) } var tmp [3000]byte index := 0 total_msg_number := 0 for { total_msg_number++ _, err := conn.Read(tmp[:]) if err != nil { log.Println("read from conn failed", err) return } if string(tmp[:8]) == "roundEnd" { log.Println("client ends the conn,msg=", total_msg_number) break } if tmp[0] != 123 { var corrext_tmp []byte var corrext_follow []byte var correct_length int for i, v := range tmp[:] { if v == 123 { correct_length = i corrext_follow = make([]byte, correct_length) _, err_follow := conn.Read(corrext_follow[:]) if err_follow != nil { log.Println("read from conn failed,err_follow=", err_follow) return } corrext_tmp = append(tmp[i:], corrext_follow[:]...) if len(corrext_tmp) != packetLength { log.Println("err", len(corrext_tmp)) } break } } putInChannel <- string(corrext_tmp[:]) index++ log.Println("total_msg_number=", total_msg_number) continue } putInChannel <- string(tmp[:]) } close(putInChannel) receive_wg.Wait() //decide if each anonymity set is oniline if len(group_list) > 0 { for _, v := range group_list { online_msg_in_each_group_in_each_round[v.Group_Id] = append(online_msg_in_each_group_in_each_round[v.Group_Id], online_msg_in_each_group_in_each_round_middle[v.Group_Id]) number := float64(0) if v.Group_outlier_flag { number = float64(group_list[v.Group_Id].Group_Size*group_Threshold_outlier) / float64(100) } else { number = float64(group_list[v.Group_Id].Group_Size*group_list[v.Group_Id].Group_Active_Threshold) / float64(100) } if number >= float64(online_msg_in_each_group_in_each_round_middle[v.Group_Id]) { group_list[v.Group_Id].Group_active = false group_list[v.Group_Id].Group_sleep_times++ } else { group_list[v.Group_Id].Group_active = true group_list[v.Group_Id].Group_sleep_times = 0 } } } db13_db14_db12_waitForReceive_flag = true close(db14_agentqueue_key) close(db14_agentqueue_val) close(db13_learning_key) close(db13_learning_val) close(db12_online_key) close(db12_online_val) db13_rpush_wg.Wait() db14_lpush_wg.Wait() db12_lpush_wg.Wait() log.Println("peocessConn ends,NameList length=", len(NameList), "Batch_List length", len(Batch_List)) } //same with v1 func cleanDBToReceiver(httpConn net.Conn) { defer cleanDBToReceiver_wg.Done() time.Sleep(time.Millisecond * 300) c, _ := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379)) c.Do("select", 14) for { x, ok := <-sendDBToReceiver if ok { sendDBToReceiver_lock.Lock() queueLen, _ := c.Do("llen", x) ql, err_ql := redis.Int(queueLen, nil) if err_ql != nil { log.Println("db14 err_ql=", err_ql) } for i := 0; i < ql; i++ { result, err_rpop := c.Do("rpop", x) if err_rpop != nil { log.Println("db14 err_rpop=", err_rpop) } res, err_res := redis.String(result, nil) if err_res != nil { log.Println("db14 err_res=", err_res) } jsons, err_jsons := json.Marshal(res) if err_jsons != nil { log.Println("db14 err_jsons=", err_jsons) } httpConn.Write(jsons) } sendDBToReceiver_lock.Unlock() } else { return } } } var learningphase_start_round int var round int var last_Batch_List []string var last_Batch_List_to_online []string var lastLearningPhase_end_flag bool //same with v1 func arrivalToLearn1() bool { log.Println("arrivalToLearn1() starts") time.Sleep(time.Millisecond * 300) if len(Batch_List) >= Batch_list_threshold { if !lastLearningPhase_end_flag { httpConn_client, err1 := net.Dial("tcp", "192.168.32.144:20002") if err1 != nil { log.Println("conn err dial 192.168.32.144:20002 failed", err1) } sendToCli := []byte("noChange") httpConn_client.Write(sendToCli) return false } learningphase_start_round = round last_Batch_List = make([]string, len(Batch_List)) copy(last_Batch_List, Batch_List) httpConn_receiver, err2 := net.Dial("tcp", "192.168.32.144:20001") if err2 != nil { log.Println("conn err,dial 192.168.32.144:20001 failed", err2) } sendDBToReceiver = make(chan string, channelSize) for i := 0; i < 100; i++ { cleanDBToReceiver_wg.Add(1) go cleanDBToReceiver(httpConn_receiver) } middle_sum_number := 0 for _, v := range arrival_phase_start_list { middle_sum_number = middle_sum_number + round - v } arrival_phase_number_list_lock.Lock() average_arrival_length = append(average_arrival_length, middle_sum_number/len(arrival_phase_start_list)) arrival_phase_number_list = append(arrival_phase_number_list, len(arrival_phase_start_list)) arrival_phase_number_list_lock.Unlock() arrival_phase_start_list = make([]int, 0) httpConn_client, err1 := net.Dial("tcp", "192.168.32.144:20002") //client address if err1 != nil { log.Println("conn err dial 192.168.32.144:20002 failed", err1) } sendToClient := "" for i, v := range Batch_List { sendDBToReceiver <- v Status.Store(v, 1) sendToClient = sendToClient + v + "," if (i%6000 == 0 && i != 0) || i == len(Batch_List)-1 { sendToCli := []byte(sendToClient) httpConn_client.Write(sendToCli) sendToClient = "" } } log.Println("Batch_List length", len(Batch_List)) close(sendDBToReceiver) time.Sleep(time.Second * 2) httpConn_client.Write([]byte("changeEnd")) Batch_List = make([]string, 0) cleanDBToReceiver_wg.Wait() c, c_err := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379)) if c_err != nil { log.Println("c_err=", c_err) } c.Do("select", 14) c.Do("flushdb") lastLearningPhase_end_flag = false return true } else { httpConn_client, err1 := net.Dial("tcp", "192.168.32.144:20002") if err1 != nil { log.Println("conn err dial 192.168.32.144:20002 failed", err1) } sendToCli := []byte("noChange") httpConn_client.Write(sendToCli) log.Println("no changed status") return false } } //same with v1 func refresh_learningPhase_flag() bool { defer log.Println("refresh_learningPhase_flag ends") log.Println("refresh_learningPhase_flag starts") if round-learningphase_start_round == 24 { log.Println("round=", round, "learningphase_start_round=", learningphase_start_round) lastLearningPhase_end_flag = true if round >= 33 { lastLearningPhase_end_flag = false return false } last_Batch_List_to_online = make([]string, len(last_Batch_List)) copy(last_Batch_List_to_online, last_Batch_List) return true } return false } //same with v1 func learning_To_online_batchList(flag bool) { log.Println("learning_To_online starts,flag=", flag) time.Sleep(time.Second * 1) if !flag { httpConn_client, err1 := net.Dial("tcp", "192.168.32.144:20002") if err1 != nil { log.Println("conn err dial 192.168.32.144:20002 failed", err1) } sendToCli := []byte("noChange") httpConn_client.Write(sendToCli) return } else { httpConn_client, err1 := net.Dial("tcp", "192.168.32.144:20002") if err1 != nil { log.Println("conn err dial 192.168.32.144:20002 failed", err1) } sendToClient := "" for i, v := range last_Batch_List_to_online { Status.Store(v, 2) sendToClient = sendToClient + v + "," if (i%6000 == 0 && i != 0) || i == len(last_Batch_List_to_online)-1 { sendToCli := []byte(sendToClient) httpConn_client.Write(sendToCli) sendToClient = "" } } time.Sleep(time.Second * 1) httpConn_client.Write([]byte("changeEnd")) } } //same with v1 func learning_function(flag bool) { defer log.Println("learning_function ends") log.Println("learning_function starts") if flag { log.Println("round=", round, "learningphase_start_round=", learningphase_start_round) //output.sh _, err := exec.Command("/bin/bash", "/home/it/middle_data/output.sh").CombinedOutput() if err != nil { log.Println("output err,cmd.Output:", err) return } log.Println("output.sh ends") starttime := time.Now() //final.py cmd, err_cmd := exec.Command("python3", "/home/it/learning-phase/final.py").CombinedOutput() if err_cmd != nil { log.Println("cmd.Output err,err_cmd=", err_cmd) return } log.Println("python result", string(cmd)) cost := int(time.Since(starttime) / time.Second) duration = cost //combine.sh _, err_combine := exec.Command("/bin/bash", "/home/it/middle_data/combine.sh").CombinedOutput() if err_combine != nil { log.Println("combine err,cmd.Output:", err_combine) return } //3,save the result filePath1 := "/home/it/middle_data/filtered_good_name_list.txt" content1, err1 := ioutil.ReadFile(filePath1) if err1 != nil { panic(err1) } m1 := strings.Split(string(content1), "\n") filePath2 := "/home/it/middle_data/groupingResult.txt" content2, err2 := ioutil.ReadFile(filePath2) if err2 != nil { panic(err1) } m2 := strings.Split(string(content2), "\n") log.Println("m1 length=", len(m1), "m2 length=", len(m2)) //create groups for i := 0; i < k_val; i++ { var g Group g.Group_Id = i + k_val*groupSet_Number g.Group_Active_Threshold = group_Threshold_global g.Group_Size = 0 g.Group_Size_copy = 0 g.Group_active = false g.Group_User_List = make([]string, 0) g.Group_start_round = round g.Group_online_sum_length = 0 g.Group_delay_duration_sum = time.Duration(0) g.Group_send_times = 0 g.Group_sleep_times = 0 g.Group_cover = 0 g.Group_delay_in_each_round = make([]time.Duration, 0) //low frequence if i == k_val-1 { g.Group_low_frequence = true } else { g.Group_low_frequence = false } //outlier if i == k_val-2 { g.Group_outlier_flag = true } else { g.Group_outlier_flag = false } //some init works online_msg_in_each_group_in_each_round[g.Group_Id] = make([]int, 0) group_size_in_each_round[g.Group_Id] = make([]int, 0) group_list = append(group_list, &g) number_group_hashmap[i+k_val*groupSet_Number] = &g } for i := 0; i < len(m1)-1; i++ { if i < len(m2)-2 { intm2, err_m2 := strconv.Atoi(m2[i]) if err_m2 != nil { log.Println("err_m2=", err_m2) return } user_group_hashmap[m1[i]] = intm2 + groupSet_Number*k_val middle_g, ok_middle_g := number_group_hashmap[intm2+groupSet_Number*k_val] if !ok_middle_g { log.Println("err,group id=", intm2+groupSet_Number*k_val) } else { middle_g.Group_Size++ middle_g.Group_User_List = append(middle_g.Group_User_List, m1[i]) } } else { user_group_hashmap[m1[i]] = (groupSet_Number+1)*k_val - 1 middle_g, ok_middle_g := number_group_hashmap[(groupSet_Number+1)*k_val-1] if !ok_middle_g { log.Println("err,group id=", (groupSet_Number+1)*k_val-1) return } else { middle_g.Group_Size++ middle_g.Group_User_List = append(middle_g.Group_User_List, m1[i]) } } } sum := 0 for i := 0; i < k_val; i++ { gg := number_group_hashmap[i+k_val*groupSet_Number] log.Println("id=", gg.Group_Id, "i+ k_val*groupSet_Number=", i+k_val*groupSet_Number, "的group,其size=", gg.Group_Size) sum = sum + gg.Group_Size } log.Println("groupSet_Number=", groupSet_Number) groupSet_Number++ log.Println("groupSet_Number=", groupSet_Number) return } else { return } } var group_active_function_db12_lock sync.Mutex var group_active_function_db12_rpop_lock sync.Mutex var eliminated_user_channel chan string var active_user_channel chan string var k_val int var group_Threshold_global int var group_Threshold_outlier int var delay_limitation int var outlier_delay_limitation int var Group_sleep_times_threshold int //same with v1 func group_active_function(gg *Group, list_1 []string, list_2 []string) { defer log.Println("group_id=", gg.Group_Id, "group_active_function ends,Group_Size", gg.Group_Size) log.Println("group_id=", gg.Group_Id, "group, group_active_function starts ,Group_Size", gg.Group_Size) c_active, err_active := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379)) if err_active != nil { log.Println("err_active=", err_active) return } gg.Group_online_sum_length = gg.Group_online_sum_length + len(list_2)*(round-gg.Group_start_round) gg.Group_Size_copy = gg.Group_Size_copy + len(list_2) for _, v := range list_2 { x, load_ok := Status.Load(v) if load_ok { if x != 2 { err_result = append(err_result, v) } } Status.Delete(v) eliminated_user_channel <- v c_active.Do("select", 12) group_active_function_db12_lock.Lock() _, err_del := c_active.Do("del", v) group_active_function_db12_lock.Unlock() if err_del != nil { log.Println("err_del=", err_del) return } } gg.Group_Size = len(list_1) gg.Group_User_List = list_1 c_active.Do("select", 12) each_round_delay := time.Duration(0) not_cover_number := 0 for _, v := range list_1 { x, load_ok := Status.Load(v) if load_ok { if x != 2 { err_result = append(err_result, v) } } group_active_function_db12_rpop_lock.Lock() result, err_rpop := c_active.Do("rpop", v) group_active_function_db12_rpop_lock.Unlock() if err_rpop != nil { log.Println("err_rpop=", err_rpop) } res, err_string := redis.String(result, nil) if err_string != nil { log.Println("err_string=", err_string) } active_user_channel <- res //calculate the latency mid := []byte(res) length := 0 for i := len(res) - 1; ; i-- { if i < 0 { log.Println("res,res=", res, "len(res)=", len(res), "len(mid)=", len(mid), "mid=", mid) } if mid[i] == 125 { //125=} length = i break } } mid = mid[:length+1] var stru map[string]interface{} err_unmarshal := json.Unmarshal(mid, &stru) if err_unmarshal != nil { log.Println("err_unmarshal=", err_unmarshal, "len(mid=)", len(mid)) } cov, err_cover := redis.String(stru["Cover_Msg"], nil) if err_cover != nil { log.Println("err_cover", err_cover) break } if cov == "yes" { continue } not_cover_number = not_cover_number + 1 gg.Group_send_times = gg.Group_send_times + 1 times, err_time := redis.Strings(stru["Megred_Timestamp"], nil) if err_time != nil { log.Println("err_time", err_time, "a=", res) } msg_time := times[0] loc, err_loc := time.LoadLocation("Local") if err_loc != nil { fmt.Println("err_loc=", err_loc) } the_time_of_msg, err_the_time_msg := time.ParseInLocation("20060102150405", msg_time, loc) if err_the_time_msg != nil { fmt.Println("err_the_time_msg=", err_the_time_msg) } the_time_of_agent, err_the_time_agent := time.ParseInLocation("20060102150405", "20121101120000", loc) if err_the_time_agent != nil { fmt.Println("err_the_time_agent=", err_the_time_agent) } send_out_timepoint := the_time_of_agent.Add(time.Duration(round+1) * time.Hour) duration := send_out_timepoint.Sub(the_time_of_msg) gg.Group_delay_duration_sum = gg.Group_delay_duration_sum + duration each_round_delay = each_round_delay + duration } if not_cover_number != 0 { ave_delay := each_round_delay / time.Duration(not_cover_number) gg.Group_delay_in_each_round = append(gg.Group_delay_in_each_round, ave_delay) } } var each_group_process_lock sync.Mutex var each_group_process_wg sync.WaitGroup //same with v1 func each_group_process(g *Group) { defer each_group_process_wg.Done() c_db12, err_db12 := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379)) if err_db12 != nil { log.Println("err_db12=", err_db12) return } c_db12.Do("select", 12) middle_active_list := make([]string, 0) middle_inactive_list := make([]string, 0) if g.Group_Size == 1 { middle_inactive_list = append(middle_inactive_list, g.Group_User_List[0]) } else { for _, v := range g.Group_User_List { each_group_process_lock.Lock() agent_queue_length, err_agent_queue_length := c_db12.Do("llen", v) each_group_process_lock.Unlock() if err_agent_queue_length != nil { log.Println("err_agent_queue_length=", err_agent_queue_length) return } length, err_length := redis.Int(agent_queue_length, nil) if err_length != nil { log.Println("err_length=", err_length) return } if g.Group_outlier_flag { if length == 0 || length >= delay_limitation { middle_inactive_list = append(middle_inactive_list, v) } else { middle_active_list = append(middle_active_list, v) } } else { if length >= 1 && length <= delay_limitation { middle_active_list = append(middle_active_list, v) } else { middle_inactive_list = append(middle_inactive_list, v) } } } } number := float64(0) //outlier if g.Group_outlier_flag { number = float64(g.Group_Size*g.Group_Active_Threshold) / float64(100) } if g.Group_active { log.Println("group_id=", g.Group_Id, "group,threshold number=", number, "active user number=", len(middle_active_list)) group_active_function(g, middle_active_list, middle_inactive_list) } else { log.Println("group_id=", g.Group_Id, "group,threshold number=", number, "active user number", len(middle_active_list)) } log.Println("group_id=", g.Group_Id, "group is ", g.Group_active, ",active user number=", len(middle_active_list), "inactive user number =", len(middle_inactive_list), "group_size=", g.Group_Size, "Group_User_List length=", len(g.Group_User_List)) } var db12_send_to_receiver_wg sync.WaitGroup var active_user_channel_closed bool //same with v1 func db12_send_to_receiver(c net.Conn) { defer db12_send_to_receiver_wg.Done() for { x, ok := <-active_user_channel if ok { jsons, err_jsons := json.Marshal(x) if err_jsons != nil { log.Println("db14, err_jsons=", err_jsons) } c.Write(jsons) } else { if active_user_channel_closed { return } else { continue } } } } var eliminated_user_channel_closed bool var online_eliminated_batchList_wg sync.WaitGroup //same with v1 func online_eliminated_batchList() { defer online_eliminated_batchList_wg.Done() log.Println("online_eliminated_batchList starts") time.Sleep(time.Second * 2) httpConn_client, err1 := net.Dial("tcp", "192.168.32.144:20002") //client address if err1 != nil { log.Println("online_eliminated_batchList conn err dial 192.168.32.144:20002 failed", err1) } number := 0 eliminated_list := make([]string, 0) for { v, ok := <-eliminated_user_channel if ok { number++ eliminated_list = append(eliminated_list, v) } else { if eliminated_user_channel_closed { break } else { continue } } } if len(eliminated_list) > 0 { sendToClient := "" for i, v := range eliminated_list { sendToClient = sendToClient + v + "," if (i%6000 == 0 && i != 0) || i == len(eliminated_list)-1 { sendToCli := []byte(sendToClient) httpConn_client.Write(sendToCli) sendToClient = "" } } time.Sleep(time.Second * 2) httpConn_client.Write([]byte("changeEnd")) } else { httpConn_client.Write([]byte("noChange")) } log.Println("len(eliminated_list)", len(eliminated_list), "number=", number) } //same with v1 func online_cache_process() { c, c_err := net.Dial("tcp", "192.168.32.144:20001") //receiver address log.Println("online_cache_process starts") if c_err != nil { log.Println("online msg db12 conn err dial 192.168.32.144:20001 failed", c_err) } active_user_channel_closed = false eliminated_user_channel_closed = false for i := 0; i < 10; i++ { db12_send_to_receiver_wg.Add(1) go db12_send_to_receiver(c) } online_eliminated_batchList_wg.Add(1) go online_eliminated_batchList() log.Println("group_list length", len(group_list)) for _, v := range group_list { each_group_process_wg.Add(1) each_group_process(v) } each_group_process_wg.Wait() close(active_user_channel) active_user_channel_closed = true close(eliminated_user_channel) eliminated_user_channel_closed = true db12_send_to_receiver_wg.Wait() online_eliminated_batchList_wg.Wait() } func record_group_size() { for _, v := range group_list { group_size_in_each_round[v.Group_Id] = append(group_size_in_each_round[v.Group_Id], v.Group_Size) } } func initMetric() { log.Println("initMetric() starts") Batch_list_threshold = 10000 channelSize = Batch_list_threshold + 10000 round = 0 lastLearningPhase_end_flag = true packetLength = 3000 k_val = 15 group_Threshold_global = 30 group_Threshold_outlier = 30 delay_limitation = 2 outlier_delay_limitation = 2 Group_sleep_times_threshold = 5 } func initNameListAndBatchList() { log.Println("initNameListAndBatchList() starts") Batch_List = make([]string, 0) NameList = make([]string, 0) number_group_hashmap = make(map[int]*Group) groupSet_Number = 0 group_list = make([]*Group, 0) online_msg_in_each_group_in_each_round = make(map[int][]int) user_group_hashmap = make(map[string]int) group_size_in_each_round = make(map[int][]int) err_result = make([]string, 0) arrival_phase_start_list = make([]int, 0) average_arrival_length = make([]int, 0) arrival_phase_number_list = make([]int, 0) } func generateLineItems(result []int) []opts.LineData { items := make([]opts.LineData, 0) for i := 0; i < len(result); i++ { items = append(items, opts.LineData{Value: result[i]}) } return items } var err_result []string var duration int // the only difference with v1 and v2 func before() { send := make([]string, 0) //users who should send message in this round not_send := make([]string, 0) ////users who should not send message in this round //receive all active users' uid log.Println("before() starts") listener, err := net.Listen("tcp", "192.168.32.144:20010") if err != nil { fmt.Println("before as server,listen 192.168.32.144:20010 failed", err) return } conn, err := listener.Accept() if err != nil { fmt.Println("err,accept failed", err) return } var tmp [10000]byte res := "" flag := false for { n, err := conn.Read(tmp[:]) if err != nil { log.Println("err,read from conn failed", err) break } //no active users,directly return if string(tmp[:8]) == "noChange" { log.Println("no before() active users in this round") listener.Close() flag = false break } else { result := string(tmp[:n]) res = res + result if string(tmp[:9]) == "changeEnd" { fmt.Println("there are active users in this round") listener.Close() flag = true break } } } //active users in each group middle_each_group := make(map[int]int) if flag { res = strings.Replace(res, " ", "", -1) res = strings.Replace(res, "\n", "", -1) str_arr := strings.Split(res, ",") //acitve users number in each group for i := 0; i < len(str_arr)-1; i++ { gid := user_group_hashmap[str_arr[i]] middle_each_group[gid]++ } log.Println("before,active users number", len(str_arr)-1) for gid, v := range middle_each_group { log.Println("gid=", gid, "threshold*size=", float64(group_list[gid].Group_Active_Threshold)*float64(group_list[gid].Group_Size)/float64(100), "v=", v) if float64(group_list[gid].Group_Active_Threshold)*float64(group_list[gid].Group_Size)/float64(100) < float64(v) { //anonymity set is online send = append(send, group_list[gid].Group_User_List...) //self-adjusting online threshold part, online threshold changes between 27 and 33 if group_list[gid].Group_Active_Threshold < 33 { group_list[gid].Group_Active_Threshold++ } } else { //anonymity set is offline not_send = append(not_send, group_list[gid].Group_User_List...) if group_list[gid].Group_Active_Threshold > 27 { group_list[gid].Group_Active_Threshold-- } } } } else { log.Println("before has active users") } log.Println("before, ", len(send), "users should send message in this round and", len(not_send), "users should not send message") //send this feedback to client time.Sleep(time.Second * 2) c1, c_err := net.Dial("tcp", "192.168.32.144:20012") //client address if c_err != nil { log.Println("err,conn before in client ,err,dial 192.168.32.144:20012 failed", c_err) } if len(send) > 0 { sendToAgent := "" for i, v := range send { sendToAgent = sendToAgent + v + "," if (i%6000 == 0 && i != 0) || i == len(send)-1 { sendToCli := []byte(sendToAgent) c1.Write(sendToCli) sendToAgent = "" } } time.Sleep(time.Second * 2) c1.Write([]byte("changeEnd")) } else { c1.Write([]byte("noChange")) } log.Println("before ends, ", len(send), "users should send,", len(not_send), "users should not send in this round") } func main() { //log logFile, err := os.OpenFile("log.txt", os.O_CREATE|os.O_APPEND|os.O_RDWR, 0666) if err != nil { panic(err) } defer logFile.Close() mw := io.MultiWriter(os.Stdout, logFile) log.SetOutput(mw) connID = 0 var connID int listener, err := net.Listen("tcp", "192.168.32.144:20000") if err != nil { log.Println("start tcp server 192.168.32.144:20000 failed", err) return } initNameListAndBatchList() initMetric() initDb() for i := 0; i < 35; i++ { log.Println("--------------------------------------------------the ", i, "th round starts----------------------------------------------") eliminated_user_channel = make(chan string, 100000) active_user_channel = make(chan string, 100000) online_msg_number = 0 online_msg_number_empty = 0 round = i cov_rec = 0 real_rec = 0 before() conn, err := listener.Accept() if err != nil { log.Println("accept failed", err) return } connID = connID + 1 log.Println("connID", connID) peocessConn(conn, connID) h24 := refresh_learningPhase_flag() arrivalToLearn1() learning_function(h24) learning_To_online_batchList(h24) online_msg_in_each_group_in_each_round_middle = make([]int, k_val*(groupSet_Number)) online_cache_process() log.Println("online_msg_number=", online_msg_number, "online_msg_number_empty=", online_msg_number_empty) record_group_size() log.Println("len(NameList)", len(NameList)) log.Println("cov_rec,real_rec are", cov_rec, real_rec) log.Println("---------------------------------------------------next round---------------------------------------------------") } x_zhou := make([]int, len(group_list)) for i := 0; i < len(group_list); i++ { x_zhou[i] = i } ave_groupsize := make([]int, len(group_size_in_each_round[0])) for i, v := range group_list { line := charts.NewLine() line.SetGlobalOptions( charts.WithInitializationOpts(opts.Initialization{Theme: types.ThemeWesteros}), charts.WithTitleOpts(opts.Title{ Title: ("group_id=" + strconv.Itoa(v.Group_Id)), Subtitle: "k_val=" + strconv.Itoa(k_val) + ";delay_limitation=" + strconv.Itoa(delay_limitation) + ";Group_Active_Threshold=" + strconv.Itoa(v.Group_Active_Threshold), })) log.Println("group_id=", i, "group's group_size", group_size_in_each_round[v.Group_Id]) if !v.Group_low_frequence { for i, v := range group_size_in_each_round[v.Group_Id] { if v == 0 { ave_groupsize[i] = ave_groupsize[i] + 1 } else { ave_groupsize[i] = ave_groupsize[i] + v } } } log.Println("group_id=", i, "group,online_msg_in_each_group_in_each_round", online_msg_in_each_group_in_each_round[v.Group_Id]) line.SetXAxis(x_zhou). AddSeries("Category A", generateLineItems(online_msg_in_each_group_in_each_round[v.Group_Id])). AddSeries("Category B", generateLineItems(group_size_in_each_round[v.Group_Id])). SetSeriesOptions(charts.WithLineChartOpts(opts.LineChart{Step: true, ConnectNulls: true})) name1 := "group_" name2 := ".html" name := name1 + strconv.Itoa(i) + name2 w, _ := os.Create(name) line.Render(w) } log.Println(err_result) log.Println("k_val=", k_val, "duration=", duration) log.Println("arrival phase", arrival_phase_number_list, average_arrival_length) sum_number := 0 sum_length := 0 sum_duration := time.Duration(0) sum_send_times := 0 for _, v := range group_list { if !v.Group_low_frequence && !v.Group_outlier_flag { log.Println(v.Group_Id, "eliminated users number=", v.Group_Size_copy) sum_length = v.Group_online_sum_length + sum_length sum_number = sum_number + v.Group_Size_copy sum_duration = sum_duration + v.Group_delay_duration_sum sum_send_times = sum_send_times + v.Group_send_times if v.Group_send_times == 0 { log.Println(v.Group_Id) continue } } } log.Println("average online phase length=", float64(sum_length)/float64(sum_number), "eliminated users sum number", sum_number, "average latency in online phase", sum_duration/time.Duration(sum_send_times), "check total_cover=,", total_cover, "average cover", float64(total_cover)/float64(sum_number)) ave_groupsize_double := make([]float64, len(ave_groupsize)) for i, v := range ave_groupsize { ave_groupsize_double[i] = math.Round((float64(v) / float64(k_val-1))) } log.Println("average group size=", ave_groupsize_double) log.Println("group size sum=", ave_groupsize) }