|
- package main
- import (
- "encoding/json"
- "fmt"
- "io"
- "log"
- "net"
- "os"
- "strconv"
- "strings"
- "sync"
- "time"
- "github.com/gomodule/redigo/redis"
- )
- var channelSize int
- var Status map[string]int //store each users' status
- var status_learning []string //users who are in learning phase
- //all users and theri ip address
- var NameList []string
- var IpList map[string]string
- //test2 uses this hashmap to get from Sample Data, in each round, there are how many messages
- var wantToSendHash map[string]int
- var packetLength int
- //get connection with redis
- func GetConn() redis.Conn {
- conn, err := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379))
- if err != nil {
- fmt.Println("redis err", err)
- return nil
- }
- return conn
- }
- //change the interface type into string type
- func interfaceToString(x interface{}) string {
- xx, err1 := redis.Strings(x, nil)
- if err1 != nil {
- fmt.Println("interfaceToString err", err1)
- fmt.Println("xx=", xx, "xx[0]=", xx[0])
- }
- xxx := xx[0]
- return xxx
- }
- var times int //used to set ip address
- //set ip address
- func setIp() string {
- str := strconv.Itoa(times)
- times = times + 1
- return str
- }
- var rejoin_number int
- //set ip address for users
- func getIp(uid string) string {
- if IpList[uid] != "" {
- if Status[uid] != 3 {
- return IpList[uid]
- } else { //rejoin users
- rejoin_number++
- Status[uid] = 0
- x := setIp()
- IpList[uid] = x
- db1_key_channel <- uid
- db1_val_channel <- x
- return x
- }
- } else { //new users
- NameList = append(NameList, uid)
- Status[uid] = 0
- x := setIp()
- IpList[uid] = x
- db1_key_channel <- uid
- db1_val_channel <- x
- return x
- }
- }
- //db2, client queue;cache time and messages in clients
- func pushIntoDB2(uid, time, msg string) {
- k1 := uid + "_msg"
- k2 := uid + "_time"
- db2_msg_key_channel <- k1
- db2_time_key_channel <- k2
- db2_msg_val_channel <- msg
- db2_time_val_channel <- time
- }
- //4 Variables used in itest2
- var User_id_queue string
- var Msg_queue string
- var Timestamp_queue string
- var User_Ip string
- //used in db1,key is uid; val is ip
- var db1_key_channel chan string
- var db1_val_channel chan string
- var mutex_db1 sync.Mutex
- func db1_Set_Goroutine() {
- defer from_db0_input_wg.Done()
- conn := GetConn()
- conn.Do("select", 1)
- for {
- mutex_db1.Lock()
- x, ok1 := <-db1_key_channel
- y, ok2 := <-db1_val_channel
- mutex_db1.Unlock()
- if !ok1 || !ok2 {
- return
- }
- conn.Do("set", x, y)
- }
- }
- //db2's channels, used to store timestamp in db2
- var db2_time_key_channel chan string
- var db2_time_val_channel chan string
- var mutex_db2_time sync.Mutex
- func db2_Time_Set_Goroutine() {
- defer from_db0_input_wg.Done()
- conn := GetConn()
- conn.Do("select", 2)
- for {
- mutex_db2_time.Lock()
- x, ok1 := <-db2_time_key_channel
- y, ok2 := <-db2_time_val_channel
- mutex_db2_time.Unlock()
- if !ok1 || !ok2 {
- return
- }
- conn.Do("lpush", x, y)
- }
- }
- //db2's channel. used to store messages in db2
- var db2_msg_key_channel chan string
- var db2_msg_val_channel chan string
- var mutex_db2_msg sync.Mutex
- func db2_Msg_Set_Goroutine() {
- defer from_db0_input_wg.Done()
- conn := GetConn()
- conn.Do("select", 2)
- for {
- mutex_db2_msg.Lock()
- x, ok1 := <-db2_msg_key_channel
- y, ok2 := <-db2_msg_val_channel
- mutex_db2_msg.Unlock()
- if !ok1 || !ok2 {
- return
- }
- conn.Do("lpush", x, y)
- }
- }
- //test2 is used to get messages from sample data in each round
- //get all messages in wach hour
- func test2(finalTime string, lastTimesIndex int) (finalIndex int) {
- fmt.Println("text2() starts")
- //6 goroutines init
- db1_key_channel = make(chan string, channelSize)
- db1_val_channel = make(chan string, channelSize)
- db2_time_key_channel = make(chan string, channelSize)
- db2_time_val_channel = make(chan string, channelSize)
- db2_msg_key_channel = make(chan string, channelSize)
- db2_msg_val_channel = make(chan string, channelSize)
- wantToSendHash = make(map[string]int)
- conn := GetConn()
- for i := lastTimesIndex; ; i++ {
- //Used to determine if a timeout has occurred
- time1, err1 := conn.Do("hmget", i, "Timestamp")
- if err1 != nil {
- fmt.Println("Timestamp hmget err,err1=", err1)
- }
- Timestamp_queue = interfaceToString(time1)
- //timeout
- if Timestamp_queue > finalTime {
- finalIndex = i
- fmt.Println("i", i)
- break
- } else {
- //no timeout
- u_id, err := conn.Do("hmget", i, "User_id")
- if err != nil {
- fmt.Println("u_id hmget err,err1=", err)
- }
- User_id_queue = interfaceToString(u_id)
- wantToSendHash[User_id_queue] = 1
- messages, err2 := conn.Do("hmget", i, "Hashtags")
- if err2 != nil {
- fmt.Println("message hmget err,err2=", err2)
- }
- Msg_queue = interfaceToString(messages)
- User_Ip = getIp(User_id_queue)
- pushIntoDB2(User_id_queue, Timestamp_queue, Msg_queue)
- }
- }
- fmt.Println("for loop ends")
- //until now, all messages in this round have been taken from sample data;
- for i := 0; i < 130; i++ {
- from_db0_input_wg.Add(3)
- go db1_Set_Goroutine()
- go db2_Time_Set_Goroutine()
- go db2_Msg_Set_Goroutine()
- }
- close(db1_key_channel)
- close(db1_val_channel)
- close(db2_time_key_channel)
- close(db2_time_val_channel)
- close(db2_msg_key_channel)
- close(db2_msg_val_channel)
- from_db0_input_wg.Wait()
- fmt.Println("text2() ends")
- return
- }
- //used to porcess these messages from taxt2()
- var real_msg_channel chan string
- var cover_msg_channel chan string
- //used to merged messages
- type megred struct {
- Megred_Timestamp []string
- Megred_Hashtags []string
- Megred_User_Id string
- Megred_Ip string
- Cover_Msg string //if it is a cover message
- }
- type users struct {
- Uid string
- Cover int
- Delay int
- Queue []string
- }
- type group struct {
- Group_id int
- Users_list []*users //
- Pattern []int //
- }
- var from_db0_input_wg sync.WaitGroup
- //used in sendOut_real; put all uid in wantToSendHash into real_msg_channel
- func putWantToSendHashIntoChannel(real_msg_channel chan<- string) {
- fmt.Println("len(wantToSendHash)", len(wantToSendHash))
- for i := range wantToSendHash {
- real_msg_channel <- i
- }
- fmt.Println("already pu all uid in wantToSendHash into channel")
- }
- //used in sendOut_cov; all uid in status_learning should send message(cover/real); if they don't have real. then they must send cover message
- func putStatus_learningIntoChannel(cover_msg_channel chan<- string) {
- fmt.Println("len(status_learning)", len(status_learning))
- for _, v := range status_learning {
- cover_msg_channel <- v
- }
- fmt.Println("already all uid in status_learning into channel")
- }
- //connect to agent
- func getHttpConnection() net.Conn {
- httpConn, err := net.Dial("tcp", "192.168.32.144:20000")
- if err != nil {
- fmt.Println("http conn err,dial 192.168.32.144:20000 failed", err)
- return nil
- }
- return httpConn
- }
- //taken all message in db2, merge them into structure
- func merge(v string, conn redis.Conn) *megred {
- conn.Do("select", "2")
- a := v + "_time"
- b := v + "_msg"
- l1, err_L1 := conn.Do("LLEN", a)
- if err_L1 != nil {
- fmt.Println("err_L1=", err_L1)
- }
- length1, err_length1 := redis.Int(l1, nil)
- if err_length1 != nil {
- fmt.Println("err_length1=", err_length1)
- }
- l2, err_L2 := conn.Do("LLEN", b)
- if err_L2 != nil {
- fmt.Println("err_L2=", err_L2)
- }
- length2, err_length2 := redis.Int(l2, nil)
- if err_length2 != nil {
- fmt.Println("err_length2=", err_length2)
- }
- time_middle, _ := conn.Do("lrange", a, 0, length1-1)
- x, err_x := redis.Strings(time_middle, nil)
- if err_x != nil {
- fmt.Println("err_x=", err_x)
- }
- aa := x
- msg_middle, _ := conn.Do("lrange", b, 0, length2-1)
- y, errr_y := redis.Strings(msg_middle, nil)
- if errr_y != nil {
- fmt.Println("errr_y=", errr_y)
- }
- bb := y
- //clean two queue
- for i := 0; i < length1; i++ {
- conn.Do("rpop", a)
- }
- for i := 0; i < length2; i++ {
- conn.Do("rpop", b)
- }
- //do merging
- ip_middle := IpList[v]
- merged_result := &megred{
- aa,
- bb,
- v,
- ip_middle,
- "no", //only merge real messages; so not a cover
- }
- return merged_result
- }
- var cov_lock sync.Mutex
- var covNumber int
- var real_lock sync.Mutex
- var realNumber int
- var arrival_msg_number int
- var learning_msg_number int
- var online_msg_number int
- var learning_msg_number_wg sync.Mutex
- var online_msg_number_wg sync.Mutex
- var test_list []string
- //send real messages out
- func sendOut_real(list <-chan string, c net.Conn, cover *megred) {
- defer finalSend_wg_real.Done()
- time.Sleep(time.Millisecond * 100)
- conn := GetConn()
- conn.Do("select", "2")
- for {
- v, ok := <-list
- if !ok {
- return
- }
- status_v := Status[v]
- switch status_v {
- case 0:
- arrival_msg_number++
- case 1:
- learning_msg_number_wg.Lock()
- learning_msg_number++
- learning_msg_number_wg.Unlock()
- case 2:
- online_msg_number_wg.Lock()
- online_msg_number++
- test_list = append(test_list, v)
- online_msg_number_wg.Unlock()
- }
- rst := merge(v, conn) //get merged message
- jsons, err_json := json.Marshal(*rst)
- if err_json != nil {
- fmt.Println("err_json=", err_json)
- }
- //do padding
- total_length := packetLength
- padding_length := total_length - len(jsons)
- if jsons[len(jsons)-1] != 125 {
- fmt.Println("rst=", rst)
- break
- }
- //it is too long, we use a cover message for this too loooooong real message
- if padding_length <= 0 {
- cov_mid := *cover
- cov_mid.Megred_User_Id = v
- cov_mid.Cover_Msg = "no"
- cov_mid.Megred_Hashtags = []string{"[other]"}
- cov_mid.Megred_Ip = IpList[v]
- cov_mid.Megred_Timestamp = []string{"000000000"}
- jsons, err_json := json.Marshal(cov_mid)
- if err_json != nil {
- fmt.Println("err_json=", err_json)
- }
- total_length := packetLength
- padding_length := total_length - len(jsons)
- pd := make([]byte, padding_length)
- jsons = append(jsons, pd...)
- if len(jsons) != packetLength {
- fmt.Println("err,cover=", jsons)
- }
- c.Write(jsons)
- real_lock.Lock()
- realNumber++
- real_lock.Unlock()
- continue
- }
- pd := make([]byte, padding_length)
- jsons = append(jsons, pd...)
- if len(jsons) != packetLength {
- fmt.Println("err,rst=", rst)
- }
- c.Write(jsons)
- real_lock.Lock()
- realNumber++
- real_lock.Unlock()
- }
- }
- //used to send out cover messages
- func sendOut_cover(list <-chan string, c net.Conn, cover *megred) {
- defer finalSend_wg_cov.Done()
- time.Sleep(time.Millisecond * 100)
- for {
- v, ok := <-list
- if !ok {
- break
- } else {
- if wantToSendHash[v] == 1 {
- continue //user in learning phase and has a real message in this round.then he don's need to send cover message
- } else {
- //should send cover message
- status_v := Status[v]
- switch status_v {
- case 1:
- learning_msg_number_wg.Lock()
- learning_msg_number++
- learning_msg_number_wg.Unlock()
- }
- cov_mid := *cover
- cov_mid.Megred_User_Id = v
- cov_mid.Cover_Msg = "yes"
- cov_mid.Megred_Hashtags = []string{"[cover]"}
- cov_mid.Megred_Ip = IpList[v]
- cov_mid.Megred_Timestamp = []string{"000000000"}
- jsons, err_json := json.Marshal(cov_mid)
- if err_json != nil {
- fmt.Println("err_json=", err_json)
- }
- total_length := packetLength
- padding_length := total_length - len(jsons)
- pd := make([]byte, padding_length)
- jsons = append(jsons, pd...)
- if len(jsons) != packetLength {
- fmt.Println("err,cover=", jsons)
- }
- c.Write(jsons)
- cov_lock.Lock()
- covNumber++
- cov_lock.Unlock()
- }
- }
- }
- }
- //tell agent, this round ends, it also needs to be padded
- func roundEnd(c net.Conn) {
- fmt.Println("send roundEnd")
- x := []byte("roundEnd")
- total_length := packetLength
- padding_length := total_length - len(x)
- pd := make([]byte, padding_length)
- x = append(x, pd...)
- if len(x) != packetLength {
- fmt.Println("err,x=", x)
- }
- c.Write(x)
- }
- var finalSend_wg_cov sync.WaitGroup
- var finalSend_wg_real sync.WaitGroup
- //call all send actions
- func finalSend() {
- c := getHttpConnection()
- cover := &megred{
- []string{},
- []string{},
- "",
- "cover",
- "yes",
- }
- real_msg_channel = make(chan string)
- cover_msg_channel = make(chan string)
- for i := 0; i < 1000; i++ {
- finalSend_wg_real.Add(1)
- finalSend_wg_cov.Add(1)
- go sendOut_real(real_msg_channel, c, cover)
- go sendOut_cover(cover_msg_channel, c, cover)
- }
- putWantToSendHashIntoChannel(real_msg_channel)
- putStatus_learningIntoChannel(cover_msg_channel)
- close(real_msg_channel)
- close(cover_msg_channel)
- finalSend_wg_real.Wait()
- finalSend_wg_cov.Wait()
- time.Sleep(time.Millisecond * 100)
- roundEnd(c)
- }
- //init db1,2
- func initDB1_DB2() {
- conn := GetConn()
- conn.Do("select", "1")
- conn.Do("flushdb")
- conn.Do("keys *")
- conn.Do("select", "2")
- conn.Do("flushdb")
- conn.Do("keys *")
- }
- //some init works
- func start() {
- initDB1_DB2()
- times = 0
- IpList = make(map[string]string)
- Status = make(map[string]int)
- NameList = make([]string, 0)
- status_learning = make([]string, 0)
- packetLength = 3000
- }
- //Calculate the time, because text2 takes messages within one hour each time, and needs this time as deadline
- func timeAddOneHour(input string) (outpute string) {
- loc, err_loc := time.LoadLocation("Local")
- if err_loc != nil {
- fmt.Println("err_loc=", err_loc)
- }
- the_time, err_the_time := time.ParseInLocation("20060102150405", input, loc)
- if err_the_time != nil {
- fmt.Println("err_the_time=", err_the_time)
- }
- //1 hour; 2 hours; 30 min
- h, _ := time.ParseDuration("1h")
- h1 := the_time.Add(1 * h)
- //h, _ := time.ParseDuration("1m")
- //h1 := the_time.Add(30 * h)
- //h, _ := time.ParseDuration("1h")
- //h1 := the_time.Add(2 * h)
- timeString := h1.Format("20060102150405")
- outpute = timeString
- return
- }
- var learning_phase_start_round int
- var round int
- //reaceive the feedback from agent; this is for users from arrival to learning
- func StatusChangeOrNot_arrival_to_learning() {
- defer log.Println("StatusChangeOrNot_arrival_to_learning() ends")
- fmt.Println("starts StatusChangeOrNot_arrival_to_learning()")
- listener, err := net.Listen("tcp", "192.168.32.144:20002")
- if err != nil {
- fmt.Println("StatusChangeOrNot_arrival_to_learning(),as server,listen 192.168.32.144:20002 fails", err)
- return
- }
- conn, err := listener.Accept()
- if err != nil {
- fmt.Println("accept failed,StatusChangeOrNot_arrival_to_learning conn err", err)
- return
- }
- var tmp [10000]byte
- res := ""
- log.Println("all users who have changed their status")
- for {
- n, err := conn.Read(tmp[:])
- if err != nil {
- fmt.Println("read from conn failed", err)
- break
- }
- //no change, directly return
- if string(tmp[:8]) == "noChange" {
- log.Println("StatusChangeOrNot_arrival_to_learning ends,no user changes status")
- listener.Close()
- return
- } else {
- result := string(tmp[:n])
- res = res + result
- if string(tmp[:9]) == "changeEnd" {
- fmt.Println("received all status")
- listener.Close()
- break
- }
- }
- }
- //space
- res = strings.Replace(res, " ", "", -1)
- //line
- res = strings.Replace(res, "\n", "", -1)
- str_arr := strings.Split(res, ",")
- learning_phase_start_round = round
- for _, str := range str_arr {
- Status[str] = 1
- status_learning = append(status_learning, str)
- }
- fmt.Println("StatusChangeOrNot_arrival_to_learning ends。someone has changed his status, len(status_learning)=", len(status_learning))
- }
- //feedback, users who have been eliminated in online phase
- func StatusChangeOrNot_eliminated() {
- defer fmt.Println("StatusChangeOrNot_eliminated ends")
- log.Println("start StatusChangeOrNot_eliminated()")
- listener, err := net.Listen("tcp", "192.168.32.144:20002")
- if err != nil {
- fmt.Println("StatusChangeOrNot_eliminated(),as server,listen 192.168.32.144:20002 failed", err)
- return
- }
- conn, err := listener.Accept()
- if err != nil {
- fmt.Println("accept failed,StatusChangeOrNot_eliminated conn err", err)
- return
- }
- var tmp [10000]byte
- res := ""
- log.Println("eliminated uid")
- for {
- n, err := conn.Read(tmp[:])
- if err != nil {
- fmt.Println("read from conn failed", err)
- break
- }
- //no change
- if string(tmp[:8]) == "noChange" {
- fmt.Println("StatusChangeOrNot_eliminatedends,no change")
- listener.Close()
- return
- } else {
- result := string(tmp[:n])
- res = res + result
- if string(tmp[:9]) == "changeEnd" {
- fmt.Println("received all status")
- listener.Close()
- break
- }
- }
- }
- //space
- res = strings.Replace(res, " ", "", -1)
- //line
- res = strings.Replace(res, "\n", "", -1)
- str_arr := strings.Split(res, ",")
- //set status =3, which means eliminated
- for _, str := range str_arr {
- Status[str] = 3
- }
- log.Println("StatusChangeOrNot_eliminatedends,len(str_arr)=", len(str_arr))
- log.Println("the eliminated users are", str_arr)
- }
- //Learning phase's length,24/48/72 rounds
- func refresh_status_learning() {
- if round-learning_phase_start_round == 24 { //24,48,72
- fmt.Println("start refresh_status_learning")
- fmt.Println("status_learning is refreshed,len(status_learning)=", len(status_learning))
- status_learning = make([]string, 0)
- }
- }
- //users from learning to online phase
- func StatusChangeOrNot_learning_to_online() {
- defer log.Println("StatusChangeOrNot_learning_to_online() ends")
- log.Println("进入StatusChangeOrNot_learning_to_online() starts")
- listener, err := net.Listen("tcp", "192.168.32.144:20002")
- if err != nil {
- fmt.Println("StatusChangeOrNot_learning_to_online(),as server,listen 192.168.32.144:20002 failed", err)
- return
- }
- conn, err := listener.Accept()
- if err != nil {
- fmt.Println("accept failed", err)
- return
- }
- var tmp [10000]byte
- res := ""
- for {
- n, err := conn.Read(tmp[:])
- if err != nil {
- fmt.Println("read from conn failed", err)
- break
- }
- //no change
- if string(tmp[:8]) == "noChange" {
- log.Println("no change")
- listener.Close()
- return
- } else {
- result := string(tmp[:n])
- res = res + result
- if string(tmp[:9]) == "changeEnd" {
- fmt.Println("received all status")
- listener.Close()
- break
- }
- }
- }
- //space
- res = strings.Replace(res, " ", "", -1)
- //line
- res = strings.Replace(res, "\n", "", -1)
- str_arr := strings.Split(res, ",")
- //set status to 2,namely online
- number := 0
- for i := 0; i < len(str_arr)-1; i++ {
- Status[str_arr[i]] = 2
- number++
- }
- fmt.Println("from learning to online users' number=", number)
- fmt.Println("someone has changed into online phase")
- }
- func main() {
- //log
- logFile, err := os.OpenFile("log.txt", os.O_CREATE|os.O_APPEND|os.O_RDWR, 0666)
- if err != nil {
- panic(err)
- }
- defer logFile.Close()
- mw := io.MultiWriter(os.Stdout, logFile)
- log.SetOutput(mw)
- start()
- channelSize = 150000
- //1st test point,1101
- lastTimesIndex := 0
- roundEndTime := "20121101130000"
- //2ed test point,1110
- //lastTimesIndex := 1479975
- //roundEndTime := "20121110130000"
- //3rd test point,1120
- //lastTimesIndex := 3751916
- //roundEndTime := "20121120130000"
- for i := 0; i < 35; i++ {
- fmt.Println("------------------------------------------------------------the", i, "th round starts------------------------------------------------------------------")
- arrival_msg_number = 0
- learning_msg_number = 0
- online_msg_number = 0
- rejoin_number = 0
- round = i
- covNumber = 0
- realNumber = 0
- lastTimesIndex = test2(roundEndTime, lastTimesIndex)
- fmt.Println("lastTimesIndex=", lastTimesIndex)
- roundEndTime = timeAddOneHour(roundEndTime)
- fmt.Println("roundEndTime=", roundEndTime)
- fmt.Println("status len", len(Status), "NameListlen ", len(NameList), "status_learning len", len(status_learning))
- finalSend()
- refresh_status_learning()
- fmt.Println("Status len", len(Status))
- StatusChangeOrNot_arrival_to_learning()
- StatusChangeOrNot_learning_to_online()
- StatusChangeOrNot_eliminated()
- fmt.Println("covNumber=", covNumber)
- fmt.Println("realNumber=", realNumber)
- fmt.Println("arrival_msg_number=", arrival_msg_number, "learning_msg_number=", learning_msg_number, "online_msg_number=", online_msg_number)
- log.Println("rejoin_number", rejoin_number)
- fmt.Println("------------------------------------------------------------the", i, "th round ends------------------------------------------------------------------")
- }
- }
|