123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182 |
- package main
- import (
- "encoding/json"
- "fmt"
- "io"
- "io/ioutil"
- "log"
- "math"
- "net"
- "os"
- "os/exec"
- "strconv"
- "strings"
- "sync"
- "time"
- // "github.com/go-echarts/go-echarts/v2/charts"
- // "github.com/go-echarts/go-echarts/v2/opts"
- // "github.com/go-echarts/go-echarts/v2/types"
- "github.com/go-echarts/go-echarts/v2/opts"
- "github.com/gomodule/redigo/redis"
- )
- var channelSize int
- var connID int
- type Rec struct {
- Megred_Timestamp []string
- Merged_Hashtags []string
- Megred_User_Id string
- Megred_Ip string
- Cover_Msg string
- }
- var arrival_phase_start_list []int
- var arrival_phase_number_list []int
- var arrival_phase_number_list_lock sync.Mutex
- var average_arrival_length []int
- var Status sync.Map
- var NameList []string
- var NameList_Append_Lock sync.Mutex
- var Batch_List []string
- var Batch_List_Append_Lock sync.Mutex
- var Batch_list_threshold int
- var putInChannel chan string
- var receive_wg sync.WaitGroup
- var peocessConn_wg sync.WaitGroup
- var uid_channel chan string
- var uip_channel chan string
- var db14_agentqueue_key chan string
- var db14_agentqueue_val chan string
- var db14_agentqueue_Lock sync.Mutex
- var db13_learning_key chan string
- var db13_learning_val chan int
- var db13_learning_Lock sync.Mutex
- var db12_online_key chan string
- var db12_online_val chan string
- var db12_online_Lock sync.Mutex
- var online_msg_number int
- var online_msg_number_empty int
- var online_msg_in_each_group_in_each_round map[int][]int
- var group_size_in_each_round map[int][]int
- var online_msg_in_each_group_in_each_round_middle_wg sync.Mutex
- var online_msg_in_each_group_in_each_round_middle []int
- var user_group_hashmap map[string]int
- var sendDBToReceiver chan string
- var cleanDBToReceiver_wg sync.WaitGroup
- var sendDBToReceiver_lock sync.Mutex
- var cov_rec int
- var real_rec int
- var cov_lock sync.Mutex
- var real_lock sync.Mutex
- var packetLength int
- var total_cover int
- var total_cover_lock sync.Mutex
- var groupSet_Number int
- var number_group_hashmap map[int]*Group
- type Group struct {
- Group_Id int
- Group_User_List []string
- Group_Size int
- Group_Size_copy int
- Group_Active_Threshold int
- Group_active bool
- Group_low_frequence bool
- Group_start_round int
- Group_online_sum_length int
- Group_delay_duration_sum time.Duration
- Group_outlier_flag bool
- Group_send_times int
- Group_sleep_times int
- Group_cover int
- }
- var group_list []*Group
- //same with v1 and v2
- func receive(c net.Conn) {
- defer receive_wg.Done()
- for {
- a, ok_return := <-putInChannel
- if !ok_return {
- return
- }
- mid := []byte(a)
- length := 0
- if a == "" {
- continue
- }
- for i := len(a) - 1; ; i-- {
- if i < 0 {
- log.Println("err,a=", a, "len(a)=", len(a), "len(mid)=", len(mid), "mid=", mid)
- }
- if mid[i] == 125 { //125=}
- length = i
- break
- }
- }
- mid = mid[:length+1]
- var stru map[string]interface{}
- err_unmarshal := json.Unmarshal(mid, &stru)
- if err_unmarshal != nil {
- log.Println(",err_unmarshal=", err_unmarshal, "len(mid=)", len(mid), "len(a)=", len(a))
- }
- var structure Rec
- //1,time
- times, err_time := redis.Strings(stru["Megred_Timestamp"], nil)
- if err_time != nil {
- log.Println("err_time", err_time, "a=", a)
- }
- structure.Megred_Timestamp = times
- //2,msg
- msgs, err_msgs := redis.Strings(stru["Megred_Hashtags"], nil)
- if err_msgs != nil {
- log.Println("err_msgs", err_msgs, "a", a)
- }
- structure.Merged_Hashtags = msgs
- //3,uid
- uid, err_uid := redis.String(stru["Megred_User_Id"], nil)
- if err_uid != nil {
- log.Println("err_uid", err_uid, "a", a)
- }
- structure.Megred_User_Id = uid
- //4,ip
- ip, err_ip := redis.String(stru["Megred_Ip"], nil)
- if err_ip != nil {
- log.Println("err_ip", err_ip, "a", a)
- }
- structure.Megred_Ip = ip
- //5.cover
- cov, err_cover := redis.String(stru["Cover_Msg"], nil)
- if err_cover != nil {
- log.Println("err_cover", err_cover)
- break
- }
- structure.Cover_Msg = cov
- staa, ok := Status.Load(structure.Megred_User_Id)
- if !ok {
- if structure.Megred_User_Id == "" {
- continue
- }
- Status.Store(structure.Megred_User_Id, 0)
- NameList_Append_Lock.Lock()
- NameList = append(NameList, structure.Megred_User_Id)
- NameList_Append_Lock.Unlock()
- Batch_List_Append_Lock.Lock()
- Batch_List = append(Batch_List, structure.Megred_User_Id)
- Batch_List_Append_Lock.Unlock()
- db14_agentqueue_Lock.Lock()
- db14_agentqueue_key <- structure.Megred_User_Id
- db14_agentqueue_val <- a
- db14_agentqueue_Lock.Unlock()
- arrival_phase_start_list = append(arrival_phase_start_list, round)
- continue
- } else {
- if staa == 0 {
- //1,arrival
- db14_agentqueue_Lock.Lock()
- db14_agentqueue_key <- structure.Megred_User_Id
- db14_agentqueue_val <- a
- db14_agentqueue_Lock.Unlock()
- continue
- }
- //b,learning
- if staa == 1 {
- if structure.Cover_Msg == "yes" {
- db13_learning_Lock.Lock()
- db13_learning_key <- structure.Megred_User_Id
- db13_learning_val <- 0
- db13_learning_Lock.Unlock()
- cov_lock.Lock()
- cov_rec++
- cov_lock.Unlock()
- } else {
- db13_learning_Lock.Lock()
- db13_learning_key <- structure.Megred_User_Id
- db13_learning_val <- 1
- db13_learning_Lock.Unlock()
- real_lock.Lock()
- real_rec++
- real_lock.Unlock()
- }
- c.Write(mid)
- continue
- }
- //c,online
- if staa == 2 {
- db12_online_Lock.Lock()
- db12_online_key <- structure.Megred_User_Id
- db12_online_val <- a
- online_msg_number++
- db12_online_Lock.Unlock()
- online_msg_in_each_group_in_each_round_middle_wg.Lock()
- gid := user_group_hashmap[structure.Megred_User_Id]
- online_msg_in_each_group_in_each_round_middle[gid]++
- online_msg_in_each_group_in_each_round_middle_wg.Unlock()
- if structure.Cover_Msg == "yes" {
- total_cover_lock.Lock()
- total_cover++
- total_cover_lock.Unlock()
- }
- continue
- }
- }
- }
- }
- //db14,sima with v1
- var db14_lpush_wg sync.WaitGroup
- var db14_lpush_lock sync.Mutex
- func db14_lpush() {
- defer db14_lpush_wg.Done()
- arrival_msg_num := 0
- log.Println("db14_rpush() starts")
- c_db14, _ := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379))
- c_db14.Do("select", 14)
- for {
- db14_lpush_lock.Lock()
- x, ok1 := <-db14_agentqueue_key
- y, ok2 := <-db14_agentqueue_val
- db14_lpush_lock.Unlock()
- if ok1 && ok2 {
- c_db14.Do("lpush", x, y)
- arrival_msg_num++
- }
- if (!ok1 || !ok2) && db13_db14_db12_waitForReceive_flag {
- log.Println("db14_lpush() ends,arrival_msg_num=", arrival_msg_num)
- return
- }
- }
- }
- //dn13,same with v1
- var db13_rpush_wg sync.WaitGroup
- var db13_rpush_lock sync.Mutex
- func db13_rpush() {
- defer db13_rpush_wg.Done()
- log.Println("db13_rpush() starts")
- real := 0
- cov := 0
- c_db13, _ := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379))
- c_db13.Do("select", 13)
- for {
- db13_rpush_lock.Lock()
- x, ok1 := <-db13_learning_key
- y, ok2 := <-db13_learning_val
- db13_rpush_lock.Unlock()
- if ok1 && ok2 {
- if y == 0 {
- cov = cov + 1
- } else {
- real = real + 1
- }
- c_db13.Do("rpush", x, y)
- }
- if db13_db14_db12_waitForReceive_flag && (!ok1 || !ok2) {
- log.Println("db13_rpush() ends,real,cov are", real, cov, "total number", real+cov)
- return
- }
- }
- }
- //db12,same with v1
- var db12_lpush_wg sync.WaitGroup
- var db12_lpush_lock sync.Mutex
- var test_list []string
- func db12_lpush() {
- defer db12_lpush_wg.Done()
- online_msg := 0
- log.Println("db12_lpush()starts")
- c_db12, _ := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379))
- c_db12.Do("select", 12)
- for {
- db12_lpush_lock.Lock()
- x, ok1 := <-db12_online_key
- test_list = append(test_list, x)
- y, ok2 := <-db12_online_val
- db12_lpush_lock.Unlock()
- if ok1 && ok2 {
- c_db12.Do("lpush", x, y)
- online_msg++
- }
- if db13_db14_db12_waitForReceive_flag && (!ok1 || !ok2) {
- log.Println("db12_lpush() ends,online_msg", online_msg)
- return
- }
- }
- }
- func initDb() {
- log.Println("initDb()")
- c, c_err := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379))
- if c_err != nil {
- log.Println("c_err=", c_err)
- }
- c.Do("select", 14)
- c.Do("Flushdb")
- c.Do("select", 13)
- c.Do("Flushdb")
- c.Do("select", 12)
- c.Do("Flushdb")
- }
- func initChannelAndListAndMap() {
- log.Println("initChannelAndListAndMap")
- putInChannel = make(chan string, channelSize)
- db14_agentqueue_key = make(chan string, channelSize)
- db14_agentqueue_val = make(chan string, channelSize)
- db13_learning_key = make(chan string, channelSize)
- db13_learning_val = make(chan int, channelSize)
- db12_online_key = make(chan string, channelSize)
- db12_online_val = make(chan string, channelSize)
- }
- var db13_db14_db12_waitForReceive_flag bool
- //same with v1
- func peocessConn(conn net.Conn, connID int) {
- db13_db14_db12_waitForReceive_flag = false
- log.Println("peocessConn starts")
- initChannelAndListAndMap()
- db14_lpush_wg.Add(1)
- go db14_lpush()
- db13_rpush_wg.Add(1)
- go db13_rpush()
- db12_lpush_wg.Add(1)
- go db12_lpush()
- c, c_err := net.Dial("tcp", "192.168.32.144:20001") //recipients address
- if c_err != nil {
- log.Println("conn err,dial 192.168.32.144:20001 failed", c_err)
- }
- for i := 0; i < 500; i++ {
- receive_wg.Add(1)
- go receive(c)
- }
- var tmp [3000]byte
- index := 0
- total_msg_number := 0
- for {
- total_msg_number++
- _, err := conn.Read(tmp[:])
- if err != nil {
- log.Println("read from conn failed", err)
- return
- }
- if string(tmp[:8]) == "roundEnd" {
- log.Println("client ends the conn,total_msg_number", total_msg_number)
- break
- }
- //Sticky bag problem
- if tmp[0] != 123 {
- var corrext_tmp []byte
- var corrext_follow []byte
- var correct_length int
- for i, v := range tmp[:] {
- if v == 123 {
- correct_length = i
- corrext_follow = make([]byte, correct_length)
- _, err_follow := conn.Read(corrext_follow[:])
- if err_follow != nil {
- log.Println("read from conn failed,err_follow=", err_follow)
- return
- }
- corrext_tmp = append(tmp[i:], corrext_follow[:]...)
- if len(corrext_tmp) != packetLength {
- log.Println("len(corrext_tmp)", len(corrext_tmp))
- }
- break
- }
- }
- putInChannel <- string(corrext_tmp[:])
- index++
- log.Println("total_msg_number=", total_msg_number)
- continue
- }
- putInChannel <- string(tmp[:])
- }
- close(putInChannel)
- receive_wg.Wait()
- //check if each anonymity set is active in this round
- if len(group_list) > 0 {
- for _, v := range group_list {
- online_msg_in_each_group_in_each_round[v.Group_Id] = append(online_msg_in_each_group_in_each_round[v.Group_Id], online_msg_in_each_group_in_each_round_middle[v.Group_Id])
- number := float64(0)
- if v.Group_outlier_flag {
- number = float64(group_list[v.Group_Id].Group_Size*group_Threshold_outlier) / float64(100)
- } else {
- number = float64(group_list[v.Group_Id].Group_Size*group_list[v.Group_Id].Group_Active_Threshold) / float64(100)
- }
- if number >= float64(online_msg_in_each_group_in_each_round_middle[v.Group_Id]) {
- group_list[v.Group_Id].Group_active = false
- group_list[v.Group_Id].Group_sleep_times++
- } else {
- group_list[v.Group_Id].Group_active = true
- group_list[v.Group_Id].Group_sleep_times = 0
- }
- }
- }
- db13_db14_db12_waitForReceive_flag = true
- close(db14_agentqueue_key)
- close(db14_agentqueue_val)
- close(db13_learning_key)
- close(db13_learning_val)
- close(db12_online_key)
- close(db12_online_val)
- db13_rpush_wg.Wait()
- db14_lpush_wg.Wait()
- db12_lpush_wg.Wait()
- log.Println("peocessConn ends,len(NameList)=", len(NameList), "Blen(Batch_List)", len(Batch_List))
- }
- //same eith in v1
- func cleanDBToReceiver(httpConn net.Conn) {
- defer cleanDBToReceiver_wg.Done()
- time.Sleep(time.Millisecond * 300)
- c, _ := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379))
- c.Do("select", 14)
- for {
- x, ok := <-sendDBToReceiver
- if ok {
- sendDBToReceiver_lock.Lock()
- queueLen, _ := c.Do("llen", x)
- ql, err_ql := redis.Int(queueLen, nil)
- if err_ql != nil {
- log.Println("db14 err_ql=", err_ql)
- }
- for i := 0; i < ql; i++ {
- result, err_rpop := c.Do("rpop", x)
- if err_rpop != nil {
- log.Println("db14 err_rpop=", err_rpop)
- }
- res, err_res := redis.String(result, nil)
- if err_res != nil {
- log.Println("db14 err_res=", err_res)
- }
- jsons, err_jsons := json.Marshal(res)
- if err_jsons != nil {
- log.Println("db14 err_jsons=", err_jsons)
- }
- httpConn.Write(jsons)
- }
- sendDBToReceiver_lock.Unlock()
- } else {
- return
- }
- }
- }
- var learningphase_start_round int
- var round int
- var last_Batch_List []string
- var last_Batch_List_to_online []string
- var lastLearningPhase_end_flag bool
- //same with v1
- func arrivalToLearn1() bool {
- log.Println(",arrivalToLearn1() starts")
- time.Sleep(time.Millisecond * 300)
- if len(Batch_List) >= Batch_list_threshold {
- if !lastLearningPhase_end_flag {
- httpConn_client, err1 := net.Dial("tcp", "192.168.32.144:20002")
- if err1 != nil {
- log.Println(" conn err dial 192.168.32.144:20002 failed", err1)
- }
- sendToCli := []byte("noChange")
- httpConn_client.Write(sendToCli)
- return false
- }
- learningphase_start_round = round
- last_Batch_List = make([]string, len(Batch_List))
- copy(last_Batch_List, Batch_List)
- httpConn_receiver, err2 := net.Dial("tcp", "192.168.32.144:20001")
- if err2 != nil {
- log.Println("conn err,dial 192.168.32.144:20001 failed", err2)
- }
- sendDBToReceiver = make(chan string, channelSize)
- for i := 0; i < 100; i++ {
- cleanDBToReceiver_wg.Add(1)
- go cleanDBToReceiver(httpConn_receiver)
- }
- middle_sum_number := 0
- for _, v := range arrival_phase_start_list {
- middle_sum_number = middle_sum_number + round - v
- }
- arrival_phase_number_list_lock.Lock()
- average_arrival_length = append(average_arrival_length, middle_sum_number/len(arrival_phase_start_list))
- arrival_phase_number_list = append(arrival_phase_number_list, len(arrival_phase_start_list))
- arrival_phase_number_list_lock.Unlock()
- arrival_phase_start_list = make([]int, 0)
- httpConn_client, err1 := net.Dial("tcp", "192.168.32.144:20002")
- if err1 != nil {
- log.Println("conn err dial 192.168.32.144:20002 failed", err1)
- }
- sendToClient := ""
- for i, v := range Batch_List {
- sendDBToReceiver <- v
- Status.Store(v, 1)
- sendToClient = sendToClient + v + ","
- if (i%6000 == 0 && i != 0) || i == len(Batch_List)-1 {
- sendToCli := []byte(sendToClient)
- httpConn_client.Write(sendToCli)
- sendToClient = ""
- }
- }
- log.Println("len(Batch_List)", len(Batch_List))
- close(sendDBToReceiver)
- time.Sleep(time.Second * 2)
- httpConn_client.Write([]byte("changeEnd"))
- Batch_List = make([]string, 0)
- cleanDBToReceiver_wg.Wait()
- c, c_err := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379))
- if c_err != nil {
- log.Println("conn err,c_err=", c_err)
- }
- c.Do("select", 14)
- c.Do("flushdb")
- log.Println("len(Batch_List)=", len(Batch_List))
- lastLearningPhase_end_flag = false
- return true
- } else {
- httpConn_client, err1 := net.Dial("tcp", "192.168.32.144:20002")
- if err1 != nil {
- log.Println("conn err dial 192.168.32.144:20002 failed", err1)
- }
- sendToCli := []byte("noChange")
- httpConn_client.Write(sendToCli)
- return false
- }
- }
- //same with v1
- func refresh_learningPhase_flag() bool {
- defer log.Println("refresh_learningPhase_flag ends")
- log.Println("refresh_learningPhase_flag starts")
- if round-learningphase_start_round == 24 {
- log.Println("round=", round, "learningphase_start_round=", learningphase_start_round)
- lastLearningPhase_end_flag = true
- if round >= 33 {
- lastLearningPhase_end_flag = false
- return false
- }
- last_Batch_List_to_online = make([]string, len(last_Batch_List))
- copy(last_Batch_List_to_online, last_Batch_List)
- return true
- }
- return false
- }
- //same with v1
- func learning_To_online_batchList(flag bool) {
- log.Println("learning_To_online starts,flag=", flag)
- time.Sleep(time.Second * 1)
- if !flag {
- httpConn_client, err1 := net.Dial("tcp", "192.168.32.144:20002")
- if err1 != nil {
- log.Println("conn err dial 192.168.32.144:20002 failed", err1)
- }
- sendToCli := []byte("noChange")
- httpConn_client.Write(sendToCli)
- return
- } else {
- httpConn_client, err1 := net.Dial("tcp", "192.168.32.144:20002")
- if err1 != nil {
- log.Println("conn err dial 192.168.32.144:20002 failed", err1)
- }
- sendToClient := ""
- for i, v := range last_Batch_List_to_online {
- Status.Store(v, 2)
- sendToClient = sendToClient + v + ","
- if (i%6000 == 0 && i != 0) || i == len(last_Batch_List_to_online)-1 {
- sendToCli := []byte(sendToClient)
- httpConn_client.Write(sendToCli)
- sendToClient = ""
- }
- }
- time.Sleep(time.Second * 1)
- httpConn_client.Write([]byte("changeEnd"))
- }
- }
- //same with v1
- func learning_function(flag bool) {
- defer log.Println("learning_function ends")
- log.Println("learning_function starts")
- if flag {
- log.Println("round=", round, "learningphase_start_round=", learningphase_start_round)
- //1,执行output.sh;
- _, err := exec.Command("/bin/bash", "/home/it/middle_data/output.sh").CombinedOutput()
- if err != nil {
- log.Println("err,cmd.Output:", err)
- return
- }
- log.Println("output.sh ends")
- starttime := time.Now()
- //final.py
- cmd, err_cmd := exec.Command("python3", "/home/it/learning-phase/final.py").CombinedOutput()
- if err_cmd != nil {
- log.Println("cmd.Output err,err_cmd=", err_cmd)
- return
- }
- log.Println("python result", string(cmd))
- cost := int(time.Since(starttime) / time.Second)
- duration = cost
- //2.5 combine.sh
- _, err_combine := exec.Command("/bin/bash", "/home/it/middle_data/combine.sh").CombinedOutput()
- if err_combine != nil {
- log.Println("combine err,cmd.Output:", err_combine)
- return
- }
- log.Println("combine.sh ends")
- filePath1 := "/home/it/middle_data/filtered_good_name_list.txt"
- content1, err1 := ioutil.ReadFile(filePath1)
- if err1 != nil {
- panic(err1)
- }
- m1 := strings.Split(string(content1), "\n")
- filePath2 := "/home/it/middle_data/groupingResult.txt"
- content2, err2 := ioutil.ReadFile(filePath2)
- if err2 != nil {
- panic(err1)
- }
- m2 := strings.Split(string(content2), "\n")
- log.Println("m1 length=", len(m1), "m2 length=", len(m2))
- for i := 0; i < k_val; i++ {
- var g Group
- g.Group_Id = i + k_val*groupSet_Number
- g.Group_Active_Threshold = group_Threshold_global
- g.Group_Size = 0
- g.Group_Size_copy = 0
- g.Group_active = false
- g.Group_User_List = make([]string, 0)
- g.Group_start_round = round
- g.Group_online_sum_length = 0
- g.Group_delay_duration_sum = time.Duration(0)
- g.Group_send_times = 0
- g.Group_sleep_times = 0
- g.Group_cover = 0
- if i == k_val-1 {
- g.Group_low_frequence = true
- } else {
- g.Group_low_frequence = false
- }
- //outlier
- if i == k_val-2 {
- g.Group_outlier_flag = true
- } else {
- g.Group_outlier_flag = false
- }
- online_msg_in_each_group_in_each_round[g.Group_Id] = make([]int, 0)
- group_size_in_each_round[g.Group_Id] = make([]int, 0)
- group_list = append(group_list, &g)
- number_group_hashmap[i+k_val*groupSet_Number] = &g
- }
- for i := 0; i < len(m1)-1; i++ {
- if i < len(m2)-2 {
- intm2, err_m2 := strconv.Atoi(m2[i])
- if err_m2 != nil {
- log.Println("err_m2=", err_m2)
- return
- }
- user_group_hashmap[m1[i]] = intm2 + groupSet_Number*k_val
- middle_g, ok_middle_g := number_group_hashmap[intm2+groupSet_Number*k_val]
- if !ok_middle_g {
- log.Println(" err group id=", intm2+groupSet_Number*k_val)
- } else {
- middle_g.Group_Size++
- middle_g.Group_User_List = append(middle_g.Group_User_List, m1[i])
- }
- } else {
- user_group_hashmap[m1[i]] = (groupSet_Number+1)*k_val - 1
- middle_g, ok_middle_g := number_group_hashmap[(groupSet_Number+1)*k_val-1]
- if !ok_middle_g {
- log.Println("err group id=", (groupSet_Number+1)*k_val-1)
- return
- } else {
- middle_g.Group_Size++
- middle_g.Group_User_List = append(middle_g.Group_User_List, m1[i])
- }
- }
- }
- sum := 0
- for i := 0; i < k_val; i++ {
- gg := number_group_hashmap[i+k_val*groupSet_Number]
- log.Println("id=", gg.Group_Id, "i+ k_val*groupSet_Number=", i+k_val*groupSet_Number, "的group,其size=", gg.Group_Size)
- sum = sum + gg.Group_Size
- }
- log.Println("groupSet_Number=", groupSet_Number)
- groupSet_Number++
- log.Println("groupSet_Number=", groupSet_Number)
- return
- } else {
- return
- }
- }
- var group_active_function_db12_lock sync.Mutex
- var group_active_function_db12_rpop_lock sync.Mutex
- var eliminated_user_channel chan string
- var active_user_channel chan string
- var k_val int
- var group_Threshold_global int
- var group_Threshold_outlier int
- var delay_limitation int
- var outlier_delay_limitation int
- var Group_sleep_times_threshold int
- //same with v1
- func group_active_function(gg *Group, list_1 []string, list_2 []string) {
- defer log.Println("group_id=", gg.Group_Id, "group_active_function ends,Group_Size=", gg.Group_Size)
- log.Println("group_id=", gg.Group_Id, "group group_active_function starts Group_Size=", gg.Group_Size)
- c_active, err_active := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379))
- if err_active != nil {
- log.Println("group_active_function conn err,err_active=", err_active)
- return
- }
- gg.Group_online_sum_length = gg.Group_online_sum_length + len(list_2)*(round-gg.Group_start_round)
- gg.Group_Size_copy = gg.Group_Size_copy + len(list_2)
- for _, v := range list_2 {
- x, load_ok := Status.Load(v)
- if load_ok {
- if x != 2 {
- err_result = append(err_result, v)
- }
- }
- Status.Delete(v)
- eliminated_user_channel <- v
- c_active.Do("select", 12)
- group_active_function_db12_lock.Lock()
- _, err_del := c_active.Do("del", v)
- group_active_function_db12_lock.Unlock()
- if err_del != nil {
- log.Println("err_del=", err_del)
- return
- }
- }
- gg.Group_Size = len(list_1)
- gg.Group_User_List = list_1
- c_active.Do("select", 12)
- for _, v := range list_1 {
- x, load_ok := Status.Load(v)
- if load_ok {
- if x != 2 {
- err_result = append(err_result, v)
- }
- }
- group_active_function_db12_rpop_lock.Lock()
- result, err_rpop := c_active.Do("rpop", v)
- group_active_function_db12_rpop_lock.Unlock()
- if err_rpop != nil {
- log.Println("err_rpop=", err_rpop)
- }
- res, err_string := redis.String(result, nil)
- if err_string != nil {
- log.Println("err_string=", err_string)
- }
- active_user_channel <- res
- mid := []byte(res)
- length := 0
- for i := len(res) - 1; ; i-- {
- if i < 0 {
- log.Println("err,res=", res, "len(res)=", len(res), "len(mid)=", len(mid), "mid=", mid)
- }
- if mid[i] == 125 { //125代表}
- length = i
- break
- }
- }
- mid = mid[:length+1]
- var stru map[string]interface{}
- err_unmarshal := json.Unmarshal(mid, &stru)
- if err_unmarshal != nil {
- log.Println("err_unmarshal=", err_unmarshal, "len(mid=)", len(mid))
- }
- cov, err_cover := redis.String(stru["Cover_Msg"], nil)
- if err_cover != nil {
- log.Println("err_cover", err_cover)
- break
- }
- if cov == "yes" {
- continue
- }
- gg.Group_send_times = gg.Group_send_times + 1
- times, err_time := redis.Strings(stru["Megred_Timestamp"], nil)
- if err_time != nil {
- log.Println("err_time", err_time, "a=", res)
- }
- msg_time := times[0]
- loc, err_loc := time.LoadLocation("Local")
- if err_loc != nil {
- fmt.Println("err_loc=", err_loc)
- }
- the_time_of_msg, err_the_time_msg := time.ParseInLocation("20060102150405", msg_time, loc)
- if err_the_time_msg != nil {
- fmt.Println("err_the_time_msg=", err_the_time_msg)
- }
- the_time_of_agent, err_the_time_agent := time.ParseInLocation("20060102150405", "20121101120000", loc)
- if err_the_time_agent != nil {
- fmt.Println("err_the_time_agent=", err_the_time_agent)
- }
- send_out_timepoint := the_time_of_agent.Add(time.Duration(round+1) * time.Hour)
- duration := send_out_timepoint.Sub(the_time_of_msg)
- gg.Group_delay_duration_sum = gg.Group_delay_duration_sum + duration
- }
- }
- var each_group_process_lock sync.Mutex
- var each_group_process_wg sync.WaitGroup
- //same with it in v1
- func each_group_process(g *Group) {
- defer each_group_process_wg.Done()
- c_db12, err_db12 := redis.Dial("tcp", fmt.Sprintf("%s:%d", "127.0.0.1", 6379))
- if err_db12 != nil {
- log.Println("err_db12=", err_db12)
- return
- }
- c_db12.Do("select", 12)
- middle_active_list := make([]string, 0)
- middle_inactive_list := make([]string, 0)
- if g.Group_Size == 1 {
- middle_inactive_list = append(middle_inactive_list, g.Group_User_List[0])
- } else {
- for _, v := range g.Group_User_List {
- each_group_process_lock.Lock()
- agent_queue_length, err_agent_queue_length := c_db12.Do("llen", v)
- each_group_process_lock.Unlock()
- if err_agent_queue_length != nil {
- log.Println("err_agent_queue_length=", err_agent_queue_length)
- return
- }
- length, err_length := redis.Int(agent_queue_length, nil)
- if err_length != nil {
- log.Println("err_length=", err_length)
- return
- }
- if g.Group_outlier_flag {
- if length == 0 || length >= delay_limitation {
- middle_inactive_list = append(middle_inactive_list, v)
- } else {
- middle_active_list = append(middle_active_list, v)
- }
- } else {
- if length >= 1 && length <= delay_limitation {
- middle_active_list = append(middle_active_list, v)
- } else {
- middle_inactive_list = append(middle_inactive_list, v)
- }
- }
- }
- }
- number := float64(0)
- //outlier
- if g.Group_outlier_flag == true {
- number = float64(g.Group_Size*g.Group_Active_Threshold) / float64(100)
- }
- if g.Group_active {
- log.Println("group_id=", g.Group_Id, "group is online, threshold number=", number, "active user number=", len(middle_active_list))
- group_active_function(g, middle_active_list, middle_inactive_list)
- } else {
- log.Println("group_id=", g.Group_Id, "group is offline threshold number=", number, "active user number", len(middle_active_list))
- }
- log.Println("group_id=", g.Group_Id, "group is", g.Group_active, ",active user number=", len(middle_active_list), "inactive user number=", len(middle_inactive_list), "group_size=", g.Group_Size, "Group_User_List长度=", len(g.Group_User_List))
- }
- var db12_send_to_receiver_wg sync.WaitGroup
- var active_user_channel_closed bool
- //same with v1
- func db12_send_to_receiver(c net.Conn) {
- defer db12_send_to_receiver_wg.Done()
- for {
- x, ok := <-active_user_channel
- if ok {
- jsons, err_jsons := json.Marshal(x)
- if err_jsons != nil {
- log.Println("err_jsons=", err_jsons)
- }
- c.Write(jsons)
- } else {
- if active_user_channel_closed {
- return
- } else {
- continue
- }
- }
- }
- }
- var eliminated_user_channel_closed bool
- var online_eliminated_batchList_wg sync.WaitGroup
- //same with v1
- func online_eliminated_batchList() {
- defer online_eliminated_batchList_wg.Done()
- time.Sleep(time.Second * 2)
- httpConn_client, err1 := net.Dial("tcp", "192.168.32.144:20002")
- if err1 != nil {
- log.Println("conn err dial 192.168.32.144:20002 failed", err1)
- }
- number := 0
- eliminated_list := make([]string, 0)
- for {
- v, ok := <-eliminated_user_channel
- if ok {
- number++
- eliminated_list = append(eliminated_list, v)
- } else {
- if eliminated_user_channel_closed {
- break
- } else {
- continue
- }
- }
- }
- if len(eliminated_list) > 0 {
- sendToClient := ""
- for i, v := range eliminated_list {
- sendToClient = sendToClient + v + ","
- if (i%6000 == 0 && i != 0) || i == len(eliminated_list)-1 {
- sendToCli := []byte(sendToClient)
- httpConn_client.Write(sendToCli)
- sendToClient = ""
- }
- }
- time.Sleep(time.Second * 2)
- httpConn_client.Write([]byte("changeEnd"))
- } else {
- httpConn_client.Write([]byte("noChange"))
- }
- log.Println("len(eliminated_list)", len(eliminated_list), "number=", number)
- }
- //same with v1
- func online_cache_process() {
- c, c_err := net.Dial("tcp", "192.168.32.144:20001")
- if c_err != nil {
- log.Println("online msg db12 conn err dial 192.168.32.144:20001 failed", c_err)
- }
- active_user_channel_closed = false
- eliminated_user_channel_closed = false
- for i := 0; i < 10; i++ {
- db12_send_to_receiver_wg.Add(1)
- go db12_send_to_receiver(c)
- }
- online_eliminated_batchList_wg.Add(1)
- go online_eliminated_batchList()
- log.Println("group_list length", len(group_list))
- for _, v := range group_list {
- each_group_process_wg.Add(1)
- each_group_process(v)
- }
- each_group_process_wg.Wait()
- close(active_user_channel)
- active_user_channel_closed = true
- close(eliminated_user_channel)
- eliminated_user_channel_closed = true
- db12_send_to_receiver_wg.Wait()
- online_eliminated_batchList_wg.Wait()
- }
- func record_group_size() {
- for _, v := range group_list {
- group_size_in_each_round[v.Group_Id] = append(group_size_in_each_round[v.Group_Id], v.Group_Size)
- }
- }
- func initMetric() {
- Batch_list_threshold = 10000
- channelSize = Batch_list_threshold + 10000
- round = 0
- lastLearningPhase_end_flag = true
- packetLength = 3000
- k_val = 15
- group_Threshold_global = 30
- group_Threshold_outlier = 30
- delay_limitation = 2
- outlier_delay_limitation = 2
- Group_sleep_times_threshold = 3
- }
- func initNameListAndBatchList() {
- Batch_List = make([]string, 0)
- NameList = make([]string, 0)
- number_group_hashmap = make(map[int]*Group)
- groupSet_Number = 0
- group_list = make([]*Group, 0)
- online_msg_in_each_group_in_each_round = make(map[int][]int)
- user_group_hashmap = make(map[string]int)
- group_size_in_each_round = make(map[int][]int)
- err_result = make([]string, 0)
- arrival_phase_start_list = make([]int, 0)
- average_arrival_length = make([]int, 0)
- arrival_phase_number_list = make([]int, 0)
- }
- func generateLineItems(result []int) []opts.LineData {
- items := make([]opts.LineData, 0)
- for i := 0; i < len(result); i++ {
- items = append(items, opts.LineData{Value: result[i]})
- }
- return items
- }
- var err_result []string
- var duration int
- func main() {
- //log
- logFile, err := os.OpenFile("log.txt", os.O_CREATE|os.O_APPEND|os.O_RDWR, 0666)
- if err != nil {
- panic(err)
- }
- defer logFile.Close()
- mw := io.MultiWriter(os.Stdout, logFile)
- log.SetOutput(mw)
- connID = 0
- var connID int
- listener, err := net.Listen("tcp", "192.168.32.144:20000")
- if err != nil {
- log.Println("start tcp server 192.168.32.144:20000 failed", err)
- return
- }
- initNameListAndBatchList()
- initMetric()
- initDb()
- for i := 0; i < 35; i++ {
- log.Println("--------------------------------------------------the ", i, "th round starts----------------------------------------------")
- eliminated_user_channel = make(chan string, 100000)
- active_user_channel = make(chan string, 100000)
- online_msg_number = 0
- online_msg_number_empty = 0
- round = i
- cov_rec = 0
- real_rec = 0
- conn, err := listener.Accept()
- if err != nil {
- log.Println("accept failed", err)
- return
- }
- connID = connID + 1
- log.Println("connID", connID)
- peocessConn(conn, connID)
- h24 := refresh_learningPhase_flag()
- arrivalToLearn1()
- learning_function(h24)
- learning_To_online_batchList(h24)
- online_msg_in_each_group_in_each_round_middle = make([]int, k_val*(groupSet_Number))
- online_cache_process()
- log.Println("online_msg_number=", online_msg_number, "online_msg_number_empty=", online_msg_number_empty)
- record_group_size()
- log.Println("len(NameList)", len(NameList))
- log.Println("cov_rec,real_rec are", cov_rec, real_rec)
- log.Println("---------------------------------------------------next round---------------------------------------------------")
- }
- ave_groupsize := make([]int, len(group_size_in_each_round[0]))
- for i, v := range group_list {
- log.Println("group_id=", i, "group,group_size is", group_size_in_each_round[v.Group_Id])
- if !v.Group_low_frequence {
- for i, v := range group_size_in_each_round[v.Group_Id] {
- if v == 0 {
- ave_groupsize[i] = ave_groupsize[i] + 1
- } else {
- ave_groupsize[i] = ave_groupsize[i] + v
- }
- }
- }
- log.Println("group_id=", i, "group,online user in each round is", online_msg_in_each_group_in_each_round[v.Group_Id])
- }
- log.Println(err_result)
- log.Println("k_val=", k_val, "duration=", duration)
- log.Println("arrival phase arrival length and users in each batch", arrival_phase_number_list, average_arrival_length)
- sum_number := 0
- sum_length := 0
- sum_duration := time.Duration(0)
- sum_send_times := 0
- sum_online_cover := 0
- for _, v := range group_list {
- if !v.Group_low_frequence {
- log.Println(v.Group_Id, "Group_size_copy=", v.Group_Size_copy, "online sum length", v.Group_online_sum_length)
- sum_length = v.Group_online_sum_length + sum_length
- sum_number = sum_number + v.Group_Size_copy
- sum_duration = sum_duration + v.Group_delay_duration_sum
- sum_send_times = sum_send_times + v.Group_send_times
- sum_online_cover = sum_online_cover + v.Group_cover
- }
- }
- log.Println("average online length=", float64(sum_length)/float64(sum_number), "sum_number", sum_number, "average latency", sum_duration/time.Duration(sum_send_times), "total_cover=,", total_cover, "average cover=", float64(total_cover)/float64(sum_number))
- ave_groupsize_double := make([]float64, len(ave_groupsize))
- for i, v := range ave_groupsize {
- ave_groupsize_double[i] = math.Round((float64(v) / 14.0))
- }
- log.Println("average group size=", ave_groupsize_double)
- }
|