package main import ( "encoding/json" "fmt" "io" "log" "net" "os" "strconv" "strings" "sync" "time" "github.com/gomodule/redigo/redis" ) var channelSize int var Status map[string]int //store each users' status var status_learning []string //users who are in learning phase //all users and theri ip address var NameList []string var IpList map[string]string //test2 uses this hashmap to get from Sample Data, in each round, there are how many messages var wantToSendHash map[string]int var packetLength int //get connection with redis func GetConn() redis.Conn { conn, err := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379)) if err != nil { fmt.Println("redis err", err) return nil } return conn } //change the interface type into string type func interfaceToString(x interface{}) string { xx, err1 := redis.Strings(x, nil) if err1 != nil { fmt.Println("interfaceToString err", err1) fmt.Println("xx=", xx, "xx[0]=", xx[0]) } xxx := xx[0] return xxx } var times int //used to set ip address //set ip address func setIp() string { str := strconv.Itoa(times) times = times + 1 return str } var rejoin_number int //set ip address for users func getIp(uid string) string { if IpList[uid] != "" { if Status[uid] != 3 { return IpList[uid] } else { //rejoin users rejoin_number++ Status[uid] = 0 x := setIp() IpList[uid] = x db1_key_channel <- uid db1_val_channel <- x return x } } else { //new users NameList = append(NameList, uid) Status[uid] = 0 x := setIp() IpList[uid] = x db1_key_channel <- uid db1_val_channel <- x return x } } //db2, client queue;cache time and messages in clients func pushIntoDB2(uid, time, msg string) { k1 := uid + "_msg" k2 := uid + "_time" db2_msg_key_channel <- k1 db2_time_key_channel <- k2 db2_msg_val_channel <- msg db2_time_val_channel <- time } //4 Variables used in itest2 var User_id_queue string var Msg_queue string var Timestamp_queue string var User_Ip string //used in db1,key is uid; val is ip var db1_key_channel chan string var db1_val_channel chan string var mutex_db1 sync.Mutex func db1_Set_Goroutine() { defer from_db0_input_wg.Done() conn := GetConn() conn.Do("select", 1) for { mutex_db1.Lock() x, ok1 := <-db1_key_channel y, ok2 := <-db1_val_channel mutex_db1.Unlock() if !ok1 || !ok2 { return } conn.Do("set", x, y) } } //db2's channels, used to store timestamp in db2 var db2_time_key_channel chan string var db2_time_val_channel chan string var mutex_db2_time sync.Mutex func db2_Time_Set_Goroutine() { defer from_db0_input_wg.Done() conn := GetConn() conn.Do("select", 2) for { mutex_db2_time.Lock() x, ok1 := <-db2_time_key_channel y, ok2 := <-db2_time_val_channel mutex_db2_time.Unlock() if !ok1 || !ok2 { return } conn.Do("lpush", x, y) } } //db2's channel. used to store messages in db2 var db2_msg_key_channel chan string var db2_msg_val_channel chan string var mutex_db2_msg sync.Mutex func db2_Msg_Set_Goroutine() { defer from_db0_input_wg.Done() conn := GetConn() conn.Do("select", 2) for { mutex_db2_msg.Lock() x, ok1 := <-db2_msg_key_channel y, ok2 := <-db2_msg_val_channel mutex_db2_msg.Unlock() if !ok1 || !ok2 { return } conn.Do("lpush", x, y) } } //test2 is used to get messages from sample data in each round //get all messages in wach hour func test2(finalTime string, lastTimesIndex int) (finalIndex int) { fmt.Println("text2() starts") //6 goroutines init db1_key_channel = make(chan string, channelSize) db1_val_channel = make(chan string, channelSize) db2_time_key_channel = make(chan string, channelSize) db2_time_val_channel = make(chan string, channelSize) db2_msg_key_channel = make(chan string, channelSize) db2_msg_val_channel = make(chan string, channelSize) wantToSendHash = make(map[string]int) conn := GetConn() for i := lastTimesIndex; ; i++ { //Used to determine if a timeout has occurred time1, err1 := conn.Do("hmget", i, "Timestamp") if err1 != nil { fmt.Println("Timestamp hmget err,err1=", err1) } Timestamp_queue = interfaceToString(time1) //timeout if Timestamp_queue > finalTime { finalIndex = i fmt.Println("i", i) break } else { //no timeout u_id, err := conn.Do("hmget", i, "User_id") if err != nil { fmt.Println("u_id hmget err,err1=", err) } User_id_queue = interfaceToString(u_id) wantToSendHash[User_id_queue] = 1 messages, err2 := conn.Do("hmget", i, "Hashtags") if err2 != nil { fmt.Println("message hmget err,err2=", err2) } Msg_queue = interfaceToString(messages) User_Ip = getIp(User_id_queue) pushIntoDB2(User_id_queue, Timestamp_queue, Msg_queue) } } fmt.Println("for loop ends") //until now, all messages in this round have been taken from sample data; for i := 0; i < 130; i++ { from_db0_input_wg.Add(3) go db1_Set_Goroutine() go db2_Time_Set_Goroutine() go db2_Msg_Set_Goroutine() } close(db1_key_channel) close(db1_val_channel) close(db2_time_key_channel) close(db2_time_val_channel) close(db2_msg_key_channel) close(db2_msg_val_channel) from_db0_input_wg.Wait() fmt.Println("text2() ends") return } //used to porcess these messages from taxt2() var real_msg_channel chan string var cover_msg_channel chan string //used to merged messages type megred struct { Megred_Timestamp []string Megred_Hashtags []string Megred_User_Id string Megred_Ip string Cover_Msg string //if it is a cover message } type users struct { Uid string Cover int Delay int Queue []string } type group struct { Group_id int Users_list []*users // Pattern []int // } var from_db0_input_wg sync.WaitGroup //used in sendOut_real; put all uid in wantToSendHash into real_msg_channel func putWantToSendHashIntoChannel(real_msg_channel chan<- string) { fmt.Println("len(wantToSendHash)", len(wantToSendHash)) for i := range wantToSendHash { real_msg_channel <- i } fmt.Println("already pu all uid in wantToSendHash into channel") } //used in sendOut_cov; all uid in status_learning should send message(cover/real); if they don't have real. then they must send cover message func putStatus_learningIntoChannel(cover_msg_channel chan<- string) { fmt.Println("len(status_learning)", len(status_learning)) for _, v := range status_learning { cover_msg_channel <- v } fmt.Println("already all uid in status_learning into channel") } //connect to agent func getHttpConnection() net.Conn { httpConn, err := net.Dial("tcp", "192.168.32.144:20000") if err != nil { fmt.Println("http conn err,dial 192.168.32.144:20000 failed", err) return nil } return httpConn } //taken all message in db2, merge them into structure func merge(v string, conn redis.Conn) *megred { conn.Do("select", "2") a := v + "_time" b := v + "_msg" l1, err_L1 := conn.Do("LLEN", a) if err_L1 != nil { fmt.Println("err_L1=", err_L1) } length1, err_length1 := redis.Int(l1, nil) if err_length1 != nil { fmt.Println("err_length1=", err_length1) } l2, err_L2 := conn.Do("LLEN", b) if err_L2 != nil { fmt.Println("err_L2=", err_L2) } length2, err_length2 := redis.Int(l2, nil) if err_length2 != nil { fmt.Println("err_length2=", err_length2) } time_middle, _ := conn.Do("lrange", a, 0, length1-1) x, err_x := redis.Strings(time_middle, nil) if err_x != nil { fmt.Println("err_x=", err_x) } aa := x msg_middle, _ := conn.Do("lrange", b, 0, length2-1) y, errr_y := redis.Strings(msg_middle, nil) if errr_y != nil { fmt.Println("errr_y=", errr_y) } bb := y //clean two queue for i := 0; i < length1; i++ { conn.Do("rpop", a) } for i := 0; i < length2; i++ { conn.Do("rpop", b) } //do merging ip_middle := IpList[v] merged_result := &megred{ aa, bb, v, ip_middle, "no", //only merge real messages; so not a cover } return merged_result } var cov_lock sync.Mutex var covNumber int var real_lock sync.Mutex var realNumber int var arrival_msg_number int var learning_msg_number int var online_msg_number int var learning_msg_number_wg sync.Mutex var online_msg_number_wg sync.Mutex var test_list []string //send real messages out func sendOut_real(list <-chan string, c net.Conn, cover *megred) { defer finalSend_wg_real.Done() time.Sleep(time.Millisecond * 100) conn := GetConn() conn.Do("select", "2") for { v, ok := <-list if !ok { return } status_v := Status[v] switch status_v { case 0: arrival_msg_number++ case 1: learning_msg_number_wg.Lock() learning_msg_number++ learning_msg_number_wg.Unlock() case 2: online_msg_number_wg.Lock() online_msg_number++ test_list = append(test_list, v) online_msg_number_wg.Unlock() } rst := merge(v, conn) //get merged message jsons, err_json := json.Marshal(*rst) if err_json != nil { fmt.Println("err_json=", err_json) } //do padding total_length := packetLength padding_length := total_length - len(jsons) if jsons[len(jsons)-1] != 125 { fmt.Println("rst=", rst) break } //it is too long, we use a cover message for this too loooooong real message if padding_length <= 0 { cov_mid := *cover cov_mid.Megred_User_Id = v cov_mid.Cover_Msg = "no" cov_mid.Megred_Hashtags = []string{"[other]"} cov_mid.Megred_Ip = IpList[v] cov_mid.Megred_Timestamp = []string{"000000000"} jsons, err_json := json.Marshal(cov_mid) if err_json != nil { fmt.Println("err_json=", err_json) } total_length := packetLength padding_length := total_length - len(jsons) pd := make([]byte, padding_length) jsons = append(jsons, pd...) if len(jsons) != packetLength { fmt.Println("err,cover=", jsons) } c.Write(jsons) real_lock.Lock() realNumber++ real_lock.Unlock() continue } pd := make([]byte, padding_length) jsons = append(jsons, pd...) if len(jsons) != packetLength { fmt.Println("err,rst=", rst) } c.Write(jsons) real_lock.Lock() realNumber++ real_lock.Unlock() } } //used to send out cover messages func sendOut_cover(list <-chan string, c net.Conn, cover *megred) { defer finalSend_wg_cov.Done() time.Sleep(time.Millisecond * 100) for { v, ok := <-list if !ok { break } else { if wantToSendHash[v] == 1 { continue //user in learning phase and has a real message in this round.then he don's need to send cover message } else { //should send cover message status_v := Status[v] switch status_v { case 1: learning_msg_number_wg.Lock() learning_msg_number++ learning_msg_number_wg.Unlock() } cov_mid := *cover cov_mid.Megred_User_Id = v cov_mid.Cover_Msg = "yes" cov_mid.Megred_Hashtags = []string{"[cover]"} cov_mid.Megred_Ip = IpList[v] cov_mid.Megred_Timestamp = []string{"000000000"} jsons, err_json := json.Marshal(cov_mid) if err_json != nil { fmt.Println("err_json=", err_json) } total_length := packetLength padding_length := total_length - len(jsons) pd := make([]byte, padding_length) jsons = append(jsons, pd...) if len(jsons) != packetLength { fmt.Println("err,cover=", jsons) } c.Write(jsons) cov_lock.Lock() covNumber++ cov_lock.Unlock() } } } } //tell agent, this round ends, it also needs to be padded func roundEnd(c net.Conn) { fmt.Println("send roundEnd") x := []byte("roundEnd") total_length := packetLength padding_length := total_length - len(x) pd := make([]byte, padding_length) x = append(x, pd...) if len(x) != packetLength { fmt.Println("err,x=", x) } c.Write(x) } var finalSend_wg_cov sync.WaitGroup var finalSend_wg_real sync.WaitGroup //call all send actions func finalSend() { c := getHttpConnection() cover := &megred{ []string{}, []string{}, "", "cover", "yes", } real_msg_channel = make(chan string) cover_msg_channel = make(chan string) for i := 0; i < 1000; i++ { finalSend_wg_real.Add(1) finalSend_wg_cov.Add(1) go sendOut_real(real_msg_channel, c, cover) go sendOut_cover(cover_msg_channel, c, cover) } putWantToSendHashIntoChannel(real_msg_channel) putStatus_learningIntoChannel(cover_msg_channel) close(real_msg_channel) close(cover_msg_channel) finalSend_wg_real.Wait() finalSend_wg_cov.Wait() time.Sleep(time.Millisecond * 100) roundEnd(c) } //init db1,2 func initDB1_DB2() { conn := GetConn() conn.Do("select", "1") conn.Do("flushdb") conn.Do("keys *") conn.Do("select", "2") conn.Do("flushdb") conn.Do("keys *") } //some init works func start() { initDB1_DB2() times = 0 IpList = make(map[string]string) Status = make(map[string]int) NameList = make([]string, 0) status_learning = make([]string, 0) packetLength = 3000 } //Calculate the time, because text2 takes messages within one hour each time, and needs this time as deadline func timeAddOneHour(input string) (outpute string) { loc, err_loc := time.LoadLocation("Local") if err_loc != nil { fmt.Println("err_loc=", err_loc) } the_time, err_the_time := time.ParseInLocation("20060102150405", input, loc) if err_the_time != nil { fmt.Println("err_the_time=", err_the_time) } //1 hour; 2 hours; 30 min h, _ := time.ParseDuration("1h") h1 := the_time.Add(1 * h) //h, _ := time.ParseDuration("1m") //h1 := the_time.Add(30 * h) //h, _ := time.ParseDuration("1h") //h1 := the_time.Add(2 * h) timeString := h1.Format("20060102150405") outpute = timeString return } var learning_phase_start_round int var round int //reaceive the feedback from agent; this is for users from arrival to learning func StatusChangeOrNot_arrival_to_learning() { defer log.Println("StatusChangeOrNot_arrival_to_learning() ends") fmt.Println("starts StatusChangeOrNot_arrival_to_learning()") listener, err := net.Listen("tcp", "192.168.32.144:20002") if err != nil { fmt.Println("StatusChangeOrNot_arrival_to_learning(),as server,listen 192.168.32.144:20002 fails", err) return } conn, err := listener.Accept() if err != nil { fmt.Println("accept failed,StatusChangeOrNot_arrival_to_learning conn err", err) return } var tmp [10000]byte res := "" log.Println("all users who have changed their status") for { n, err := conn.Read(tmp[:]) if err != nil { fmt.Println("read from conn failed", err) break } //no change, directly return if string(tmp[:8]) == "noChange" { log.Println("StatusChangeOrNot_arrival_to_learning ends,no user changes status") listener.Close() return } else { result := string(tmp[:n]) res = res + result if string(tmp[:9]) == "changeEnd" { fmt.Println("received all status") listener.Close() break } } } //space res = strings.Replace(res, " ", "", -1) //line res = strings.Replace(res, "\n", "", -1) str_arr := strings.Split(res, ",") learning_phase_start_round = round for _, str := range str_arr { Status[str] = 1 status_learning = append(status_learning, str) } fmt.Println("StatusChangeOrNot_arrival_to_learning ends。someone has changed his status, len(status_learning)=", len(status_learning)) } //feedback, users who have been eliminated in online phase func StatusChangeOrNot_eliminated() { defer fmt.Println("StatusChangeOrNot_eliminated ends") log.Println("start StatusChangeOrNot_eliminated()") listener, err := net.Listen("tcp", "192.168.32.144:20002") if err != nil { fmt.Println("StatusChangeOrNot_eliminated(),as server,listen 192.168.32.144:20002 failed", err) return } conn, err := listener.Accept() if err != nil { fmt.Println("accept failed,StatusChangeOrNot_eliminated conn err", err) return } var tmp [10000]byte res := "" log.Println("eliminated uid") for { n, err := conn.Read(tmp[:]) if err != nil { fmt.Println("read from conn failed", err) break } //no change if string(tmp[:8]) == "noChange" { fmt.Println("StatusChangeOrNot_eliminatedends,no change") listener.Close() return } else { result := string(tmp[:n]) res = res + result if string(tmp[:9]) == "changeEnd" { fmt.Println("received all status") listener.Close() break } } } //space res = strings.Replace(res, " ", "", -1) //line res = strings.Replace(res, "\n", "", -1) str_arr := strings.Split(res, ",") //set status =3, which means eliminated for _, str := range str_arr { Status[str] = 3 } log.Println("StatusChangeOrNot_eliminatedends,len(str_arr)=", len(str_arr)) log.Println("the eliminated users are", str_arr) } //Learning phase's length,24/48/72 rounds func refresh_status_learning() { if round-learning_phase_start_round == 24 { //24,48,72 fmt.Println("start refresh_status_learning") fmt.Println("status_learning is refreshed,len(status_learning)=", len(status_learning)) status_learning = make([]string, 0) } } //users from learning to online phase func StatusChangeOrNot_learning_to_online() { defer log.Println("StatusChangeOrNot_learning_to_online() ends") log.Println("进入StatusChangeOrNot_learning_to_online() starts") listener, err := net.Listen("tcp", "192.168.32.144:20002") if err != nil { fmt.Println("StatusChangeOrNot_learning_to_online(),as server,listen 192.168.32.144:20002 failed", err) return } conn, err := listener.Accept() if err != nil { fmt.Println("accept failed", err) return } var tmp [10000]byte res := "" for { n, err := conn.Read(tmp[:]) if err != nil { fmt.Println("read from conn failed", err) break } //no change if string(tmp[:8]) == "noChange" { log.Println("no change") listener.Close() return } else { result := string(tmp[:n]) res = res + result if string(tmp[:9]) == "changeEnd" { fmt.Println("received all status") listener.Close() break } } } //space res = strings.Replace(res, " ", "", -1) //line res = strings.Replace(res, "\n", "", -1) str_arr := strings.Split(res, ",") //set status to 2,namely online number := 0 for i := 0; i < len(str_arr)-1; i++ { Status[str_arr[i]] = 2 number++ } fmt.Println("from learning to online users' number=", number) fmt.Println("someone has changed into online phase") } func main() { //log logFile, err := os.OpenFile("log.txt", os.O_CREATE|os.O_APPEND|os.O_RDWR, 0666) if err != nil { panic(err) } defer logFile.Close() mw := io.MultiWriter(os.Stdout, logFile) log.SetOutput(mw) start() channelSize = 150000 //1st test point,1101 lastTimesIndex := 0 roundEndTime := "20121101130000" //2ed test point,1110 //lastTimesIndex := 1479975 //roundEndTime := "20121110130000" //3rd test point,1120 //lastTimesIndex := 3751916 //roundEndTime := "20121120130000" for i := 0; i < 35; i++ { fmt.Println("------------------------------------------------------------the", i, "th round starts------------------------------------------------------------------") arrival_msg_number = 0 learning_msg_number = 0 online_msg_number = 0 rejoin_number = 0 round = i covNumber = 0 realNumber = 0 lastTimesIndex = test2(roundEndTime, lastTimesIndex) fmt.Println("lastTimesIndex=", lastTimesIndex) roundEndTime = timeAddOneHour(roundEndTime) fmt.Println("roundEndTime=", roundEndTime) fmt.Println("status len", len(Status), "NameListlen ", len(NameList), "status_learning len", len(status_learning)) finalSend() refresh_status_learning() fmt.Println("Status len", len(Status)) StatusChangeOrNot_arrival_to_learning() StatusChangeOrNot_learning_to_online() StatusChangeOrNot_eliminated() fmt.Println("covNumber=", covNumber) fmt.Println("realNumber=", realNumber) fmt.Println("arrival_msg_number=", arrival_msg_number, "learning_msg_number=", learning_msg_number, "online_msg_number=", online_msg_number) log.Println("rejoin_number", rejoin_number) fmt.Println("------------------------------------------------------------the", i, "th round ends------------------------------------------------------------------") } }