12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013 |
- package main
- import (
- "encoding/json"
- "fmt"
- "io"
- "io/ioutil"
- "log"
- "net"
- "os"
- "strconv"
- "strings"
- "sync"
- "time"
- "github.com/gomodule/redigo/redis"
- )
- var channelSize int
- var Status map[string]int
- var status_learning []string
- var NameList []string
- var IpList map[string]string
- var wantToSendHash map[string]int
- var packetLength int
- func GetConn() redis.Conn {
- conn, err := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379))
- if err != nil {
- fmt.Println("connect redis error redis err", err)
- return nil
- }
- return conn
- }
- func interfaceToString(x interface{}) string {
- xx, err1 := redis.Strings(x, nil)
- if err1 != nil {
- fmt.Println("interfaceToString err", err1)
- fmt.Println("xx=", xx, "xx[0]=", xx[0])
- }
- xxx := xx[0]
- return xxx
- }
- var times int
- func setIp() string {
- str := strconv.Itoa(times)
- times = times + 1
- return str
- }
- var rejoin_number int
- func getIp(uid string) string {
- if IpList[uid] != "" {
- if Status[uid] != 3 {
- return IpList[uid]
- } else {
- rejoin_number++
- Status[uid] = 0
- x := setIp()
- IpList[uid] = x
- db1_key_channel <- uid
- db1_val_channel <- x
- return x
- }
- } else {
- NameList = append(NameList, uid)
- Status[uid] = 0
- x := setIp()
- IpList[uid] = x
- db1_key_channel <- uid
- db1_val_channel <- x
- return x
- }
- }
- //db2
- func pushIntoDB2(uid, time, msg string) {
- k1 := uid + "_msg"
- k2 := uid + "_time"
- db2_msg_key_channel <- k1
- db2_time_key_channel <- k2
- db2_msg_val_channel <- msg
- db2_time_val_channel <- time
- }
- var User_id_queue string
- var Msg_queue string
- var Timestamp_queue string
- var User_Ip string
- var db1_key_channel chan string
- var db1_val_channel chan string
- var mutex_db1 sync.Mutex
- //db1
- func db1_Set_Goroutine() {
- defer from_db0_input_wg.Done()
- conn := GetConn()
- conn.Do("select", 1)
- for {
- mutex_db1.Lock()
- x, ok1 := <-db1_key_channel
- y, ok2 := <-db1_val_channel
- mutex_db1.Unlock()
- if !ok1 || !ok2 {
- return
- }
- conn.Do("set", x, y)
- }
- }
- var db2_time_key_channel chan string
- var db2_time_val_channel chan string
- var mutex_db2_time sync.Mutex
- //db2
- func db2_Time_Set_Goroutine() {
- defer from_db0_input_wg.Done()
- conn := GetConn()
- conn.Do("select", 2)
- for {
- mutex_db2_time.Lock()
- x, ok1 := <-db2_time_key_channel
- y, ok2 := <-db2_time_val_channel
- mutex_db2_time.Unlock()
- if !ok1 || !ok2 {
- return
- }
- conn.Do("lpush", x, y)
- }
- }
- var db2_msg_key_channel chan string
- var db2_msg_val_channel chan string
- var mutex_db2_msg sync.Mutex
- func db2_Msg_Set_Goroutine() {
- defer from_db0_input_wg.Done()
- conn := GetConn()
- conn.Do("select", 2)
- for {
- mutex_db2_msg.Lock()
- x, ok1 := <-db2_msg_key_channel
- y, ok2 := <-db2_msg_val_channel
- mutex_db2_msg.Unlock()
- if !ok1 || !ok2 {
- return
- }
- conn.Do("lpush", x, y)
- }
- }
- func test2(finalTime string, lastTimesIndex int) (finalIndex int) {
- fmt.Println("text2() starts")
- db1_key_channel = make(chan string, channelSize)
- db1_val_channel = make(chan string, channelSize)
- db2_time_key_channel = make(chan string, channelSize)
- db2_time_val_channel = make(chan string, channelSize)
- db2_msg_key_channel = make(chan string, channelSize)
- db2_msg_val_channel = make(chan string, channelSize)
- wantToSendHash = make(map[string]int)
- conn := GetConn()
- for i := lastTimesIndex; ; i++ {
- time1, err1 := conn.Do("hmget", i, "Timestamp")
- if err1 != nil {
- fmt.Println("Timestamp hmget,err1=", err1)
- }
- Timestamp_queue = interfaceToString(time1)
- if Timestamp_queue > finalTime {
- finalIndex = i
- fmt.Println("timeout,i", i)
- break
- } else {
- u_id, err := conn.Do("hmget", i, "User_id")
- if err != nil {
- fmt.Println("u_id hmget err1=", err)
- }
- User_id_queue = interfaceToString(u_id)
- wantToSendHash[User_id_queue] = 1
- messages, err2 := conn.Do("hmget", i, "Hashtags")
- if err2 != nil {
- fmt.Println("message hmget,err2=", err2)
- }
- Msg_queue = interfaceToString(messages)
- User_Ip = getIp(User_id_queue)
- pushIntoDB2(User_id_queue, Timestamp_queue, Msg_queue)
- }
- }
- for i := 0; i < 130; i++ {
- from_db0_input_wg.Add(3)
- go db1_Set_Goroutine()
- go db2_Time_Set_Goroutine()
- go db2_Msg_Set_Goroutine()
- }
- close(db1_key_channel)
- close(db1_val_channel)
- close(db2_time_key_channel)
- close(db2_time_val_channel)
- close(db2_msg_key_channel)
- close(db2_msg_val_channel)
- from_db0_input_wg.Wait()
- fmt.Println("text2() ends")
- return
- }
- var real_msg_channel chan string
- var cover_msg_channel chan string
- type megred struct {
- Megred_Timestamp []string
- Megred_Hashtags []string
- Megred_User_Id string
- Megred_Ip string
- Cover_Msg string
- }
- var from_db0_input_wg sync.WaitGroup
- //used for sendOut_real
- func putWantToSendHashIntoChannel(real_msg_channel chan<- string) {
- fmt.Println("wantToSendHash length", len(wantToSendHash))
- for i := range wantToSendHash {
- real_msg_channel <- i
- }
- }
- //used for sendOut_cov
- func putStatus_learningIntoChannel(cover_msg_channel chan<- string) {
- fmt.Println("status_learning length", len(status_learning))
- for _, v := range status_learning {
- cover_msg_channel <- v
- }
- }
- func getHttpConnection() net.Conn {
- httpConn, err := net.Dial("tcp", "192.168.32.144:20000")
- if err != nil {
- fmt.Println("conn err dial 127.0.0.1:20000 failed", err)
- return nil
- }
- return httpConn
- }
- //same with v1
- func merge(v string, conn redis.Conn) *megred {
- conn.Do("select", "2")
- a := v + "_time"
- b := v + "_msg"
- l1, err_L1 := conn.Do("LLEN", a)
- if err_L1 != nil {
- fmt.Println("err_L1=", err_L1)
- }
- length1, err_length1 := redis.Int(l1, nil)
- if err_length1 != nil {
- fmt.Println("err_length1=", err_length1)
- }
- l2, err_L2 := conn.Do("LLEN", b)
- if err_L2 != nil {
- fmt.Println("err_L2=", err_L2)
- }
- length2, err_length2 := redis.Int(l2, nil)
- if err_length2 != nil {
- fmt.Println("err_length2=", err_length2)
- }
- //time
- time_middle, _ := conn.Do("lrange", a, 0, length1-1)
- x, err_x := redis.Strings(time_middle, nil)
- if err_x != nil {
- fmt.Println("err_x=", err_x)
- }
- aa := x
- //msg
- msg_middle, _ := conn.Do("lrange", b, 0, length2-1)
- y, errr_y := redis.Strings(msg_middle, nil)
- if errr_y != nil {
- fmt.Println("errr_y=", errr_y)
- }
- bb := y
- //clean two queue
- for i := 0; i < length1; i++ {
- conn.Do("rpop", a)
- }
- for i := 0; i < length2; i++ {
- conn.Do("rpop", b)
- }
- ip_middle := IpList[v]
- merged_result := &megred{
- aa,
- bb,
- v,
- ip_middle,
- "no",
- }
- return merged_result
- }
- var cov_lock sync.Mutex
- var covNumber int
- var real_lock sync.Mutex
- var realNumber int
- var arrival_msg_number int
- var learning_msg_number int
- var online_msg_number int
- var learning_msg_number_wg sync.Mutex
- var online_msg_number_wg sync.Mutex
- var test_list []string
- type users struct {
- Uid string
- Chance int
- Queue [][]byte
- Group *group
- Used_cover int
- Used_delay int
- Pointer int
- }
- type group struct {
- Group_id int
- Users_list []*users
- Pattern []int
- }
- //used in pattern()
- var k_val int
- var batch int
- var count_cover_delay_lock sync.Mutex
- var total_used_cover int
- var total_used_delay int
- var int_to_group map[int]*group
- var grouplist []*group
- var online_users_hash map[string]*users
- var online_users_hash_lock sync.Mutex
- var users_channel chan *users
- var users_channel_wg sync.WaitGroup
- var eliminated_users_list []string
- //get fixed scheduler for each anonymity set
- func pattern() {
- filePath1 := "/home/it/middle_data/filtered_good_name_list.txt"
- content1, err1 := ioutil.ReadFile(filePath1)
- if err1 != nil {
- fmt.Println("err1", err1)
- panic(err1)
- }
- m1 := strings.Split(string(content1), "\n")
- filePath2 := "/home/it/middle_data/groupingResult.txt"
- content2, err2 := ioutil.ReadFile(filePath2)
- if err2 != nil {
- fmt.Println("err2", err2)
- panic(err2)
- }
- m2 := strings.Split(string(content2), "\n")
- filePath3 := "/home/it/middle_data/pattern.txt"
- content3, err3 := ioutil.ReadFile(filePath3)
- if err3 != nil {
- fmt.Println("err3", err3)
- panic(err3)
- }
- m3 := strings.Split(string(content3), "\n")
- //creat k_val pattern list
- pattern := make([][]int, k_val-2)
- for i := 0; i < k_val-2; i++ {
- pattern[i] = make([]int, 0)
- }
- for i := 0; i < len(m3)-1; i++ {
- middle := strings.Split(m3[i], " ")
- for j := 0; j < len(middle); j++ {
- mm, _ := strconv.Atoi(middle[j])
- pattern[i] = append(pattern[i], mm)
- }
- }
- for i := 0; i < k_val-1; i++ {
- var g group
- g.Group_id = i + batch*k_val
- if i < k_val-2 {
- g.Pattern = pattern[i]
- }
- g.Users_list = make([]*users, 0)
- int_to_group[i+batch*k_val] = &g
- grouplist = append(grouplist, &g)
- }
- //creat structure for each user
- for i := 0; i < len(m2)-1; i++ {
- var u users
- u.Uid = m1[i]
- u.Chance = 20 // init chance
- u.Queue = make([][]byte, 0)
- middle, _ := strconv.Atoi(m2[i])
- u.Group = int_to_group[middle]
- u.Used_cover = 0
- u.Used_delay = 0
- u.Pointer = 0
- int_to_group[middle].Users_list = append(int_to_group[middle].Users_list, &u)
- online_users_hash_lock.Lock()
- online_users_hash[u.Uid] = &u
- online_users_hash_lock.Unlock()
- }
- for _, v := range int_to_group {
- fmt.Println(v.Pattern, len(v.Users_list))
- }
- users_numbers := 0
- special_numbers := 0
- for i, v := range online_users_hash {
- if i[0] > '9' || i[0] < '0' || i == "" {
- log.Println("err", i, v)
- special_numbers++
- } else {
- users_numbers++
- }
- }
- log.Println()
- log.Println("users_numbers", users_numbers, "special_numbers", special_numbers)
- }
- //cache messages from client queue into scheduler queue
- func send_online(c net.Conn) {
- defer log.Println("send_online ends")
- users_channel = make(chan *users, 100000)
- for _, v := range online_users_hash {
- if v == nil {
- continue
- }
- if v.Group.Group_id%k_val != k_val-2 {
- users_channel <- v
- }
- }
- close(users_channel)
- for i := 0; i < 1000; i++ {
- users_channel_wg.Add(1)
- go send_each_user(c)
- }
- users_channel_wg.Wait()
- }
- //follow the fixed scheduler, use it to reshape users' traffic
- func send_each_user(c net.Conn) {
- defer users_channel_wg.Done()
- for {
- u, ok := <-users_channel
- if !ok {
- return
- } else {
- if len(u.Group.Pattern) == 0 {
- log.Println("err,u=", u)
- }
- step := u.Group.Pattern[u.Pointer] //pointer points to next bit
- u.Pointer = u.Pointer + 1
- if step == 1 { //should send in this round
- //pop a message from scheduler queue and send it out
- if len(u.Queue) > 0 {
- x := u.Queue[0]
- u.Queue = u.Queue[1:]
- c.Write(x)
- continue
- } else {
- //can still use cover message
- if u.Chance > 0 {
- cover := &megred{
- []string{},
- []string{},
- "",
- "cover",
- "yes",
- }
- cov_mid := *cover
- cov_mid.Megred_User_Id = u.Uid
- cov_mid.Cover_Msg = "yes"
- cov_mid.Megred_Hashtags = []string{"[cover]"}
- cov_mid.Megred_Ip = IpList[u.Uid]
- cov_mid.Megred_Timestamp = []string{"000000000"}
- jsons, err_json := json.Marshal(cov_mid)
- if err_json != nil {
- fmt.Println("err_json=", err_json)
- }
- total_length := packetLength
- padding_length := total_length - len(jsons)
- pd := make([]byte, padding_length)
- jsons = append(jsons, pd...)
- if len(jsons) != packetLength {
- fmt.Println("err,cover=", jsons)
- }
- c.Write(jsons)
- u.Chance = u.Chance - 1
- u.Used_cover++
- continue
- } else {
- //queue is empty and has no chance,then do nothing
- continue
- }
- }
- } else {
- //step==0,should not send message in this round
- if len(u.Queue) > 0 { //queue is not empty, use a delay
- if u.Chance > 0 { //has chance, can use delay
- u.Chance = u.Chance - 1
- u.Used_delay++
- continue
- } else { //has no chance,cannot use delay
- x := u.Queue[0]
- u.Queue = u.Queue[1:]
- c.Write(x)
- continue
- }
- } else { //queue is empty,do nothing
- continue
- }
- }
- }
- }
- }
- //for real message, process them according to user's status
- //similar with v1,the only difference is that don's send real message to ACS,but firstly cache them into scheduler queue
- func sendOut_real(list <-chan string, c net.Conn, cover *megred) {
- defer finalSend_wg_real.Done()
- time.Sleep(time.Millisecond * 100)
- conn := GetConn()
- conn.Do("select", "2")
- for {
- v, ok := <-list
- if !ok {
- return
- }
- status_v := Status[v]
- switch status_v {
- case 0:
- arrival_msg_number++
- case 1:
- learning_msg_number_wg.Lock()
- learning_msg_number++
- learning_msg_number_wg.Unlock()
- case 2:
- online_msg_number_wg.Lock()
- online_msg_number++
- test_list = append(test_list, v)
- online_msg_number_wg.Unlock()
- }
- rst := merge(v, conn)
- jsons, err_json := json.Marshal(*rst)
- if err_json != nil {
- fmt.Println("err_json=", err_json)
- }
- total_length := packetLength
- padding_length := total_length - len(jsons)
- if jsons[len(jsons)-1] != 125 {
- fmt.Println("err,rst=", rst)
- break
- }
- if padding_length <= 0 {
- fmt.Println("too long,use cover instead of this real message")
- cov_mid := *cover
- cov_mid.Megred_User_Id = v
- cov_mid.Cover_Msg = "no"
- cov_mid.Megred_Hashtags = []string{"[other]"}
- cov_mid.Megred_Ip = IpList[v]
- cov_mid.Megred_Timestamp = []string{"000000000"}
- jsons, err_json := json.Marshal(cov_mid)
- if err_json != nil {
- fmt.Println("err_json=", err_json)
- }
- total_length := packetLength
- padding_length := total_length - len(jsons)
- pd := make([]byte, padding_length)
- jsons = append(jsons, pd...)
- if len(jsons) != packetLength {
- fmt.Println("err,cover=", jsons)
- }
- if status_v != 2 || online_users_hash[v] == nil {
- c.Write(jsons)
- real_lock.Lock()
- realNumber++
- real_lock.Unlock()
- continue
- }
- //online
- if status_v == 2 { //online
- if online_users_hash[v].Group.Group_id%k_val != k_val-2 {
- online_users_hash_lock.Lock()
- middle_user := online_users_hash[v]
- online_users_hash_lock.Unlock()
- middle_user.Queue = append(middle_user.Queue, jsons)
- continue
- } else { //outliers
- c.Write(jsons)
- real_lock.Lock()
- realNumber++
- real_lock.Unlock()
- continue
- }
- }
- continue
- }
- pd := make([]byte, padding_length)
- jsons = append(jsons, pd...)
- if len(jsons) != packetLength {
- fmt.Println("err,rst=", rst)
- }
- //not online, or outliers, directly send out
- if status_v != 2 || online_users_hash[v] == nil {
- c.Write(jsons)
- real_lock.Lock()
- realNumber++
- real_lock.Unlock()
- continue
- }
- //online
- if status_v == 2 { //online,not low frequence;normal or ontliers
- if online_users_hash[v].Group.Group_id%k_val != k_val-2 {
- online_users_hash_lock.Lock()
- middle_user := online_users_hash[v]
- online_users_hash_lock.Unlock()
- middle_user.Queue = append(middle_user.Queue, jsons)
- continue
- } else { //outliers
- c.Write(jsons)
- real_lock.Lock()
- realNumber++
- real_lock.Unlock()
- continue
- }
- }
- }
- }
- //
- func sendOut_cover(list <-chan string, c net.Conn, cover *megred) {
- defer finalSend_wg_cov.Done()
- time.Sleep(time.Millisecond * 100)
- for {
- v, ok := <-list
- if !ok {
- break
- } else {
- if wantToSendHash[v] == 1 {
- continue
- } else {
- status_v := Status[v]
- switch status_v {
- case 1:
- learning_msg_number_wg.Lock()
- learning_msg_number++
- learning_msg_number_wg.Unlock()
- }
- cov_mid := *cover
- cov_mid.Megred_User_Id = v
- cov_mid.Cover_Msg = "yes"
- cov_mid.Megred_Hashtags = []string{"[cover]"}
- cov_mid.Megred_Ip = IpList[v]
- cov_mid.Megred_Timestamp = []string{"000000000"}
- jsons, err_json := json.Marshal(cov_mid)
- if err_json != nil {
- fmt.Println("err_json=", err_json)
- }
- total_length := packetLength
- padding_length := total_length - len(jsons)
- pd := make([]byte, padding_length)
- jsons = append(jsons, pd...)
- if len(jsons) != packetLength {
- fmt.Println("出错了,长度不对,cover=", jsons)
- }
- c.Write(jsons)
- cov_lock.Lock()
- covNumber++
- cov_lock.Unlock()
- }
- }
- }
- }
- //same with v1
- func roundEnd(c net.Conn) {
- x := []byte("roundEnd")
- total_length := packetLength
- padding_length := total_length - len(x)
- pd := make([]byte, padding_length)
- x = append(x, pd...)
- if len(x) != packetLength {
- fmt.Println("err,x=", x)
- }
- c.Write(x)
- }
- var finalSend_wg_cov sync.WaitGroup
- var finalSend_wg_real sync.WaitGroup
- //send messages from scheduler queue to agent/ACS
- func finalSend(c net.Conn) {
- cover := &megred{
- []string{},
- []string{},
- "",
- "cover",
- "yes",
- }
- real_msg_channel = make(chan string)
- cover_msg_channel = make(chan string)
- for i := 0; i < 1000; i++ {
- finalSend_wg_real.Add(1)
- finalSend_wg_cov.Add(1)
- go sendOut_real(real_msg_channel, c, cover)
- go sendOut_cover(cover_msg_channel, c, cover)
- }
- putWantToSendHashIntoChannel(real_msg_channel)
- putStatus_learningIntoChannel(cover_msg_channel)
- close(real_msg_channel)
- close(cover_msg_channel)
- finalSend_wg_real.Wait()
- finalSend_wg_cov.Wait()
- time.Sleep(time.Millisecond * 100)
- send_online(c)
- roundEnd(c)
- }
- func initDB1_DB2() {
- conn := GetConn()
- conn.Do("select", "1")
- conn.Do("flushdb")
- conn.Do("keys *")
- conn.Do("select", "2")
- conn.Do("flushdb")
- conn.Do("keys *")
- }
- func start() {
- initDB1_DB2()
- times = 0
- IpList = make(map[string]string)
- Status = make(map[string]int)
- NameList = make([]string, 0)
- status_learning = make([]string, 0)
- packetLength = 3000
- k_val = 15
- batch = 0
- int_to_group = make(map[int]*group)
- grouplist = make([]*group, 0)
- total_used_cover = 0
- total_used_delay = 0
- online_users_hash = make(map[string]*users)
- eliminated_users_list = make([]string, 0)
- }
- func timeAddOneHour(input string) (outpute string) {
- loc, err_loc := time.LoadLocation("Local")
- if err_loc != nil {
- fmt.Println("err_loc=", err_loc)
- }
- the_time, err_the_time := time.ParseInLocation("20060102150405", input, loc)
- if err_the_time != nil {
- fmt.Println("err_the_time=", err_the_time)
- }
- h, _ := time.ParseDuration("1h")
- h1 := the_time.Add(1 * h)
- //h, _ := time.ParseDuration("1m")
- //h1 := the_time.Add(30 * h)
- //h, _ := time.ParseDuration("1h")
- //h1 := the_time.Add(2 * h)
- timeString := h1.Format("20060102150405")
- outpute = timeString
- return
- }
- var learning_phase_start_round int
- var round int
- //same with v1
- func StatusChangeOrNot_arrival_to_learning() {
- defer log.Println("StatusChangeOrNot_arrival_to_learning() ends")
- fmt.Println("StatusChangeOrNot_arrival_to_learning() starts")
- listener, err := net.Listen("tcp", "192.168.32.144:20002")
- if err != nil {
- fmt.Println("StatusChangeOrNot_arrival_to_learning(),as server,listen 192.168.32.144:20002 failed ", err)
- return
- }
- conn, err := listener.Accept()
- if err != nil {
- fmt.Println("accept failed,StatusChangeOrNot_arrival_to_learning err", err)
- return
- }
- var tmp [10000]byte
- res := ""
- for {
- n, err := conn.Read(tmp[:])
- if err != nil {
- fmt.Println("read from conn failed", err)
- break
- }
- if string(tmp[:8]) == "noChange" {
- log.Println("StatusChangeOrNot_arrival_to_learning ends,no change")
- listener.Close()
- return
- } else {
- result := string(tmp[:n])
- res = res + result
- if string(tmp[:9]) == "changeEnd" {
- listener.Close()
- break
- }
- }
- }
- res = strings.Replace(res, " ", "", -1)
- res = strings.Replace(res, "\n", "", -1)
- str_arr := strings.Split(res, ",")
- learning_phase_start_round = round
- for _, str := range str_arr {
- Status[str] = 1
- status_learning = append(status_learning, str)
- }
- fmt.Println("StatusChangeOrNot_arrival_to_learningends, someone has changed=", len(status_learning))
- }
- //same with v1
- func StatusChangeOrNot_eliminated() {
- defer fmt.Println("StatusChangeOrNot_eliminated ends")
- log.Println("StatusChangeOrNot_eliminated() starts")
- listener, err := net.Listen("tcp", "192.168.32.144:20002")
- if err != nil {
- fmt.Println("StatusChangeOrNot_eliminated(), as server,listen 192.168.32.144:20002 failed", err)
- return
- }
- conn, err := listener.Accept()
- if err != nil {
- fmt.Println("accept failed,StatusChangeOrNot_eliminated err", err)
- return
- }
- var tmp [10000]byte
- res := ""
- for {
- n, err := conn.Read(tmp[:])
- if err != nil {
- fmt.Println("read from conn failed", err)
- break
- }
- if string(tmp[:8]) == "noChange" {
- fmt.Println("StatusChangeOrNot_eliminatedends, no change")
- listener.Close()
- return
- } else {
- result := string(tmp[:n])
- res = res + result
- if string(tmp[:9]) == "changeEnd" {
- listener.Close()
- break
- }
- }
- }
- res = strings.Replace(res, " ", "", -1)
- res = strings.Replace(res, "\n", "", -1)
- str_arr := strings.Split(res, ",")
- //eliminated users
- for _, str := range str_arr {
- Status[str] = 3
- online_users_hash_lock.Lock()
- middle_u := online_users_hash[str]
- online_users_hash_lock.Unlock()
- if middle_u == nil {
- continue
- }
- if middle_u.Group.Group_id%k_val == k_val-2 {
- continue
- }
- count_cover_delay_lock.Lock()
- total_used_cover = total_used_cover + middle_u.Used_cover
- total_used_delay = total_used_delay + middle_u.Used_delay
- count_cover_delay_lock.Unlock()
- middle_u = nil
- online_users_hash[str] = nil
- }
- log.Println("eliminated users are", str_arr)
- }
- //same with v1
- func refresh_status_learning() {
- if round-learning_phase_start_round == 24 {
- fmt.Println("len(status_learning)=", len(status_learning))
- status_learning = make([]string, 0)
- }
- }
- //same with v1
- func StatusChangeOrNot_learning_to_online() {
- defer log.Println("StatusChangeOrNot_learning_to_online() ends")
- log.Println("StatusChangeOrNot_learning_to_online() starts")
- listener, err := net.Listen("tcp", "192.168.32.144:20002")
- if err != nil {
- fmt.Println("StatusChangeOrNot_learning_to_online(),as server,listen 192.168.32.144:20002 failed", err)
- return
- }
- conn, err := listener.Accept()
- if err != nil {
- fmt.Println("accept failed", err)
- return
- }
- var tmp [10000]byte
- res := ""
- for {
- n, err := conn.Read(tmp[:])
- if err != nil {
- fmt.Println("read from conn failed", err)
- break
- }
- if string(tmp[:8]) == "noChange" {
- listener.Close()
- return
- } else {
- result := string(tmp[:n])
- res = res + result
- if string(tmp[:9]) == "changeEnd" {
- listener.Close()
- break
- }
- }
- }
- res = strings.Replace(res, " ", "", -1)
- res = strings.Replace(res, "\n", "", -1)
- str_arr := strings.Split(res, ",")
- //set status to 1
- number := 0
- for i := 0; i < len(str_arr)-1; i++ {
- Status[str_arr[i]] = 2
- number++
- }
- fmt.Println("changed status number=", number)
- pattern()
- batch = batch + 1
- }
- func main() {
- //lof
- logFile, err := os.OpenFile("log.txt", os.O_CREATE|os.O_APPEND|os.O_RDWR, 0666)
- if err != nil {
- panic(err)
- }
- defer logFile.Close()
- mw := io.MultiWriter(os.Stdout, logFile)
- log.SetOutput(mw)
- start()
- channelSize = 150000
- //1st test point:
- lastTimesIndex := 0
- roundEndTime := "20121101130000"
- //2ed test point:
- //lastTimesIndex := 1479975
- //roundEndTime := "20121110130000"
- for i := 0; i < 35; i++ {
- fmt.Println("------------------------------------------------------------the", i, "th round starts------------------------------------------------------------------")
- c := getHttpConnection()
- arrival_msg_number = 0
- learning_msg_number = 0
- online_msg_number = 0
- rejoin_number = 0
- round = i
- covNumber = 0
- realNumber = 0
- lastTimesIndex = test2(roundEndTime, lastTimesIndex)
- fmt.Println("lastTimesIndex=", lastTimesIndex)
- roundEndTime = timeAddOneHour(roundEndTime)
- fmt.Println("roundEndTime=", roundEndTime)
- fmt.Println("status length", len(Status), "NameList length", len(NameList), "status_learning length", len(status_learning))
- finalSend(c)
- refresh_status_learning()
- fmt.Println("Status length", len(Status))
- StatusChangeOrNot_arrival_to_learning()
- StatusChangeOrNot_learning_to_online()
- StatusChangeOrNot_eliminated()
- fmt.Println("covNumber=", covNumber)
- fmt.Println("realNumber=", realNumber)
- fmt.Println("arrival_msg_number=", arrival_msg_number, "earning_msg_number=", learning_msg_number, "online_msg_number=", online_msg_number)
- log.Println("rejoin_number", rejoin_number)
- fmt.Println("------------------------------------------------------------the", i, "th round ends------------------------------------------------------------------")
- }
- log.Println("total_used_cover,total_used_delay=", total_used_cover, total_used_delay)
- users_number := make([]int, 0)
- total_delay := 0
- total_cover := 0
- total_queue := 0
- total_user := 0
- for _, v := range grouplist {
- for _, m := range v.Users_list {
- total_delay = total_delay + m.Used_delay
- total_cover = total_cover + m.Used_cover
- total_queue = total_queue + len(m.Queue)
- }
- total_user = total_user + len(v.Users_list)
- users_number = append(users_number, len(v.Users_list))
- }
- log.Println("users_number", users_number)
- log.Println("average delay", float64(total_delay)/float64(total_user), " average cover=", float64(total_cover)/float64(total_user), "average queue=", float64(total_queue)/float64(total_user))
- }
|