client.go 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948
  1. package client
  2. /*
  3. #cgo CFLAGS: -O2
  4. #cgo LDFLAGS: -lcrypto -lm
  5. #include "../c/dpf.h"
  6. #include "../c/okvClient.h"
  7. #include "../c/dpf.c"
  8. #include "../c/okvClient.c"
  9. */
  10. import "C"
  11. import (
  12. "2PPS/lib"
  13. "bufio"
  14. "bytes"
  15. "crypto/rand"
  16. "crypto/sha256"
  17. "crypto/tls"
  18. "encoding/json"
  19. "fmt"
  20. "log"
  21. "math/big"
  22. mr "math/rand"
  23. "net"
  24. "os"
  25. "sort"
  26. "strconv"
  27. "strings"
  28. "sync"
  29. "time"
  30. "unsafe"
  31. "golang.org/x/crypto/nacl/box"
  32. )
  33. type tweet struct {
  34. Topics []string
  35. Text string
  36. }
  37. const leader string = "127.0.0.1:4441"
  38. //needs to be changed at leader/client at the same time
  39. const numClients = 1000
  40. //mylimit=8000
  41. //sudo prlimit --nofile=$mylimit --pid $$; ulimit -n $mylimit
  42. //for every terminal
  43. const dataLength int = 256
  44. //Maximum Transport Unit
  45. const mtu int = 1100
  46. var dbWriteSize int
  47. var round int
  48. var topicList []string
  49. var archiveTopicList []string
  50. var neededSubscriptions int
  51. var publisherAmount int
  52. var goodPadding int
  53. var blocksReceived int
  54. var timeBounds []float64
  55. //gui variables
  56. var guiTweet string
  57. var mainTweets string
  58. var archiveTweets string
  59. //this translates to a simulated round length of ~2h
  60. var speedUp float64 = 7200 / ((maxTimePerRound.Seconds()) * 3)
  61. var maxTimePerRound time.Duration = 2 * time.Second
  62. var startTime int
  63. var archiveInterests = make([]int, 1)
  64. var sharedSecret [numClients][2][32]byte = createSharedSecret()
  65. var wantsArchive = make([]byte, 1)
  66. var leaderPublicKey *[32]byte
  67. var followerPublicKey *[32]byte
  68. var clientPrivateKey [numClients]*[32]byte
  69. var clientPublicKey [numClients]*[32]byte
  70. func main() {
  71. f, err := os.OpenFile("evalDataClient", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
  72. if err != nil {
  73. log.Fatalf("error opening file: %v", err)
  74. }
  75. defer f.Close()
  76. log.SetOutput(f)
  77. log.Println("simulated Duration", speedUp)
  78. wg := &sync.WaitGroup{}
  79. for i := 0; i < numClients; i++ {
  80. wg.Add(1)
  81. go Client(i, f)
  82. time.Sleep(1 * time.Millisecond)
  83. }
  84. wg.Wait()
  85. }
  86. func Client(clientNumber int, f *os.File) {
  87. if clientNumber == 0 {
  88. getTimeBounds()
  89. }
  90. generatedPublicKey, generatedPrivateKey, err := box.GenerateKey(rand.Reader)
  91. if err != nil {
  92. panic(err)
  93. }
  94. clientPrivateKey[clientNumber] = generatedPrivateKey
  95. clientPublicKey[clientNumber] = generatedPublicKey
  96. C.initializeCipher()
  97. //initializes the connection to the leader
  98. conf := &tls.Config{
  99. InsecureSkipVerify: true,
  100. }
  101. leaderConn, err := tls.Dial("tcp", leader, conf)
  102. if err != nil {
  103. panic(err)
  104. }
  105. leaderConn.SetDeadline(time.Time{})
  106. //receives topics first so client can participate asap
  107. receiveTopicLists(leaderConn)
  108. //gets the public keys of both servers
  109. var tmpLeaderPubKey [32]byte
  110. _, err = leaderConn.Read(tmpLeaderPubKey[:])
  111. if err != nil {
  112. panic(err)
  113. }
  114. leaderPublicKey = &tmpLeaderPubKey
  115. var tmpFollowerPubKey [32]byte
  116. _, err = leaderConn.Read(tmpFollowerPubKey[:])
  117. if err != nil {
  118. panic(err)
  119. }
  120. followerPublicKey = &tmpFollowerPubKey
  121. //sends own public key
  122. writeTo(leaderConn, clientPublicKey[clientNumber][:])
  123. neededSubscriptionsBytes := readFrom(leaderConn, 4)
  124. neededSubscriptions = byteToInt(neededSubscriptionsBytes)
  125. startTimeBytes := readFrom(leaderConn, 4)
  126. startTime = byteToInt(startTimeBytes)
  127. //setup ends above
  128. //while client is active he is always connected and has to participate
  129. for {
  130. //gets current phase
  131. phase := readFrom(leaderConn, 1)
  132. if phase[0] == 1 {
  133. //gets current dbWriteSize from leader
  134. dbWriteSizeBytes := readFrom(leaderConn, 4)
  135. dbWriteSize = byteToInt(dbWriteSizeBytes)
  136. //roundAsBytes := readFrom(leaderConn, 4)
  137. roundAsBytes := make([]byte, 4)
  138. _, err = leaderConn.Read(roundAsBytes)
  139. if err != nil {
  140. panic(err)
  141. }
  142. round = byteToInt(roundAsBytes)
  143. if clientNumber == 0 {
  144. fmt.Println("Round ", round)
  145. }
  146. //request virtualAddress from leader via pirQuery
  147. encryptedQueryLeader, encryptedQueryFollower := createAuditPIRQuery(clientNumber)
  148. sendQuerys(encryptedQueryLeader, encryptedQueryFollower, leaderConn, false)
  149. pos := receiveVirtualAddress(sharedSecret[clientNumber], leaderConn)
  150. tweet := getGuiTweet()
  151. if clientNumber == numClients-1 {
  152. log.Println("Round", round)
  153. log.Println("publisherAmount", publisherAmount)
  154. log.Println("goodPadding", goodPadding)
  155. log.Println("blocksReceived", blocksReceived)
  156. log.Println("goodPadding Percentage", float64(goodPadding)/float64(blocksReceived))
  157. publisherAmount = 0
  158. }
  159. //prep the query
  160. dataSize := len(tweet)
  161. querySize := make([]byte, 4)
  162. cQuerySize := C.int(byteToInt(querySize))
  163. var dpfQueryA *C.uchar
  164. var dpfQueryB *C.uchar
  165. C.prepQuery(C.int(pos), C.int(dbWriteSize), (*C.uchar)(&tweet[0]), C.int(dataSize), &cQuerySize, &dpfQueryA, &dpfQueryB)
  166. intQuerySize := int(cQuerySize) //byteToInt(querySize)
  167. //write the query
  168. queryAPlaintext := C.GoBytes(unsafe.Pointer(dpfQueryA), C.int(intQuerySize))
  169. //encrypts queryA and appends it to message
  170. var nonce [24]byte
  171. //fill nonce with randomness
  172. _, err = rand.Read(nonce[:])
  173. if err != nil {
  174. panic("couldn't get randomness for nonce!")
  175. }
  176. dpfQueryAEncrypted := box.Seal(nonce[:], queryAPlaintext, &nonce, leaderPublicKey, clientPrivateKey[clientNumber])
  177. //encrypts queryB and appends it to message
  178. queryBPlaintext := C.GoBytes(unsafe.Pointer(dpfQueryB), C.int(intQuerySize))
  179. //fill nonce with randomness
  180. _, err = rand.Read(nonce[:])
  181. if err != nil {
  182. panic("couldn't get randomness for nonce!")
  183. }
  184. dpfQueryBEncrypted := box.Seal(nonce[:], queryBPlaintext, &nonce, followerPublicKey, clientPrivateKey[clientNumber])
  185. //writes the dpfQuery to the leader
  186. dpfLengthBytes := intToByte(len(dpfQueryAEncrypted))
  187. writeTo(leaderConn, dpfLengthBytes)
  188. writeTo(leaderConn, dpfQueryAEncrypted)
  189. writeTo(leaderConn, dpfQueryBEncrypted)
  190. C.free(unsafe.Pointer(dpfQueryA))
  191. C.free(unsafe.Pointer(dpfQueryB))
  192. } else if phase[0] == 3 {
  193. /*
  194. possible Values
  195. 0 : new client
  196. leader expects sharedSecrets, expects pirQuery
  197. 1 : update needed
  198. leader sends topicList, performs local update of sharedSecret, expects pirQuery
  199. 2 : no update needed
  200. nothing
  201. */
  202. subPhase := readFrom(leaderConn, 1)
  203. var encryptedQueryLeader, encryptedQueryFollower []byte
  204. //first time participating
  205. if subPhase[0] == 0 {
  206. receiveTopicLists(leaderConn)
  207. encryptedQueryLeader, encryptedQueryFollower = createPIRQuery(int(subPhase[0]), clientNumber)
  208. sendQuerys(encryptedQueryLeader, encryptedQueryFollower, leaderConn, false)
  209. }
  210. //updates the topic list and what client is interested in
  211. if subPhase[0] == 1 {
  212. receiveTopicLists(leaderConn)
  213. //updates local secret
  214. for index := 0; index < 2; index++ {
  215. sharedSecret[clientNumber][index] = sha256.Sum256(sharedSecret[clientNumber][index][:])
  216. }
  217. encryptedQueryLeader, encryptedQueryFollower = createPIRQuery(int(subPhase[0]), clientNumber)
  218. sendQuerys(encryptedQueryLeader, encryptedQueryFollower, leaderConn, false)
  219. }
  220. receiveTweets(sharedSecret[clientNumber], leaderConn, false, clientNumber)
  221. if len(archiveTopicList) > 0 {
  222. wantsArchive[0] = 1
  223. } else {
  224. wantsArchive[0] = 0
  225. }
  226. writeTo(leaderConn, wantsArchive)
  227. if wantsArchive[0] == 1 && len(archiveTopicList) > 0 {
  228. encryptedQueryLeader, encryptedQueryFollower = createPIRQuery(-1, clientNumber)
  229. sendQuerys(encryptedQueryLeader, encryptedQueryFollower, leaderConn, true)
  230. receiveTweets(sharedSecret[clientNumber], leaderConn, true, clientNumber)
  231. }
  232. } else {
  233. fmt.Println("Phase", phase)
  234. panic("somethin went wrong")
  235. }
  236. }
  237. }
  238. //creates and sends the pirQuerys for each server
  239. func createPIRQuery(subPhase int, clientNumber int) ([]byte, []byte) {
  240. //later this will be taken from gui, this is only for testing
  241. topicsOfInterest := make([]int, 1)
  242. topicsOfInterest[0] = 0 //mr.Intn(10)
  243. archiveInterests[0] = 0 //mr.Intn(10)
  244. tmpNeededSubscriptions := neededSubscriptions
  245. if tmpNeededSubscriptions > len(topicList) {
  246. tmpNeededSubscriptions = len(topicList)
  247. }
  248. tmptopicsOfInterest := make([]int, len(topicsOfInterest))
  249. copy(tmptopicsOfInterest, topicsOfInterest)
  250. tmpTopicList := make([]string, len(topicList))
  251. copy(tmpTopicList, topicList)
  252. if wantsArchive[0] == 1 && subPhase == -1 {
  253. tmpNeededSubscriptions = len(archiveInterests)
  254. if tmpNeededSubscriptions > len(archiveTopicList) {
  255. tmpNeededSubscriptions = len(archiveTopicList)
  256. }
  257. tmptopicsOfInterest = archiveInterests //todo! take archiveInterests from gui
  258. tmpTopicList = archiveTopicList
  259. }
  260. //creates fake topicsOfInterest if client is boooring
  261. if len(tmptopicsOfInterest) < tmpNeededSubscriptions && subPhase != -1 {
  262. tmptopicsOfInterest = addFakeInterests(len(tmpTopicList), tmptopicsOfInterest, false)
  263. }
  264. //pirQuery [topicsOfInterest][serverAmount][topicAmount]byte
  265. pirQuerys := make([][][]byte, len(tmptopicsOfInterest))
  266. for i := range pirQuerys {
  267. pirQuerys[i] = make([][]byte, 2)
  268. for j := range pirQuerys[i] {
  269. pirQuerys[i][j] = make([]byte, len(tmpTopicList))
  270. }
  271. }
  272. //for leader
  273. //pirQuery will be filled with random bits
  274. for topic := range tmptopicsOfInterest {
  275. for index := range tmpTopicList {
  276. bit, err := rand.Int(rand.Reader, big.NewInt(2))
  277. if err != nil {
  278. panic(err)
  279. }
  280. pirQuerys[topic][0][index] = byte(bit.Int64())
  281. }
  282. }
  283. tmptopicsOfInterestBytes := make([]byte, len(tmpTopicList))
  284. for index := range tmptopicsOfInterest {
  285. if tmptopicsOfInterest[index] == 1 {
  286. tmptopicsOfInterestBytes[index] = 1
  287. }
  288. }
  289. for topicIndex, topic := range tmptopicsOfInterest {
  290. for index := range tmpTopicList {
  291. if topic == index {
  292. if pirQuerys[topicIndex][0][index] == 1 {
  293. pirQuerys[topicIndex][1][index] = 0
  294. } else {
  295. pirQuerys[topicIndex][1][index] = 1
  296. }
  297. } else {
  298. if pirQuerys[topicIndex][0][index] == 0 {
  299. pirQuerys[topicIndex][1][index] = 0
  300. } else {
  301. pirQuerys[topicIndex][1][index] = 1
  302. }
  303. }
  304. }
  305. }
  306. //flattens the querys to be able to send them more efficently
  307. messagesFlattened := make([][]byte, 2)
  308. //adds the sharedSecret to the first pirQuery when first time participating
  309. if subPhase == 0 {
  310. for server := 0; server < 2; server++ {
  311. messagesFlattened[server] = append(messagesFlattened[server], sharedSecret[clientNumber][server][:]...)
  312. }
  313. }
  314. for server := range messagesFlattened {
  315. for topic := range pirQuerys {
  316. messagesFlattened[server] = append(messagesFlattened[server], pirQuerys[topic][server]...)
  317. }
  318. }
  319. var nonce [24]byte
  320. _, err := rand.Read(nonce[:])
  321. if err != nil {
  322. panic("couldn't get randomness for nonce!")
  323. }
  324. encryptedQueryLeader := box.Seal(nonce[:], messagesFlattened[0], &nonce, leaderPublicKey, clientPrivateKey[clientNumber])
  325. _, err = rand.Read(nonce[:])
  326. if err != nil {
  327. panic("couldn't get randomness for nonce!")
  328. }
  329. encryptedQueryFollower := box.Seal(nonce[:], messagesFlattened[1], &nonce, followerPublicKey, clientPrivateKey[clientNumber])
  330. return encryptedQueryLeader, encryptedQueryFollower
  331. }
  332. func sendQuerys(encryptedQueryLeader, encryptedQueryFollower []byte, leaderConn net.Conn, getArchive bool) {
  333. encryptedLength := len(encryptedQueryLeader)
  334. //sends the pirQuerysLength to the leader
  335. writeTo(leaderConn, intToByte(encryptedLength))
  336. //sends the pirQuerys to the leader
  337. writeTo(leaderConn, encryptedQueryLeader)
  338. writeTo(leaderConn, encryptedQueryFollower)
  339. if getArchive {
  340. writeTo(leaderConn, intToByte(len(archiveInterests)))
  341. }
  342. }
  343. func receiveVirtualAddress(sharedSecret [2][32]byte, leaderConn net.Conn) int {
  344. virtualAddressByte := readFrom(leaderConn, 4)
  345. //xores the sharedSecret
  346. for h := 0; h < 2; h++ {
  347. for i := 0; i < 4; i++ {
  348. virtualAddressByte[i] = virtualAddressByte[i] ^ sharedSecret[h][i]
  349. }
  350. }
  351. return byteToInt(virtualAddressByte)
  352. }
  353. func receiveTweets(sharedSecret [2][32]byte, leaderConn net.Conn, getArchive bool, clientNumber int) {
  354. tmpNeededSubscriptions := neededSubscriptions
  355. if tmpNeededSubscriptions > len(topicList) {
  356. tmpNeededSubscriptions = len(topicList)
  357. }
  358. if getArchive {
  359. tmpNeededSubscriptions = len(archiveInterests)
  360. if tmpNeededSubscriptions > len(archiveTopicList) {
  361. tmpNeededSubscriptions = len(archiveTopicList)
  362. }
  363. }
  364. for i := 0; i < tmpNeededSubscriptions; i++ {
  365. if !getArchive {
  366. blocksReceived++
  367. }
  368. //client receives tweets
  369. tweetsLengthBytes := readFrom(leaderConn, 4)
  370. tweetsLength := byteToInt(tweetsLengthBytes)
  371. tweets := readFrom(leaderConn, tweetsLength)
  372. //expand sharedSecret so it is of right length
  373. expandBy := len(tweets) / 32
  374. expandedSharedSecrets := make([][]byte, 2)
  375. for i := 0; i < 2; i++ {
  376. for j := 0; j < expandBy; j++ {
  377. expandedSharedSecrets[i] = append(expandedSharedSecrets[i], sharedSecret[i][:]...)
  378. }
  379. }
  380. //xors the received messge into the message to display
  381. for i := 0; i < 2; i++ {
  382. lib.Xor(expandedSharedSecrets[i][:], tweets)
  383. }
  384. index := strings.Index(string(tweets), ";;")
  385. fmt.Println("received", string(tweets))
  386. if index != -1 {
  387. textArr := strings.Split(string(tweets), ";;;")
  388. text := textArr[:len(textArr)-1]
  389. if text[1] != "" {
  390. text[1] = text[1][1:]
  391. }
  392. ok := strings.Contains(text[0], text[1])
  393. if !ok {
  394. goodPadding++
  395. }
  396. if getArchive {
  397. if !strings.Contains(archiveTweets, text[0]) {
  398. archiveTweets += text[0] + ";"
  399. }
  400. } else {
  401. if !strings.Contains(mainTweets, text[0]) {
  402. mainTweets += text[0] + ";"
  403. }
  404. }
  405. } else if index == -1 && tweets[0] != 0 {
  406. fmt.Println("error")
  407. fmt.Println("round", round, string(tweets), "length", len(tweets))
  408. return
  409. }
  410. }
  411. }
  412. func GetTweets(whereFrom int) string {
  413. var tweets string
  414. var recTweets string
  415. if whereFrom == 0 {
  416. recTweets = mainTweets
  417. mainTweets = ""
  418. } else {
  419. recTweets = archiveTweets
  420. archiveTweets = ""
  421. fmt.Println("archive", recTweets)
  422. }
  423. tweetArr := strings.Split(recTweets, ";;")
  424. for _, str := range tweetArr {
  425. strArr := strings.Split(str, ";")
  426. tweets += "<li>" + strArr[0] + "<br>" + strArr[1] + "</li>"
  427. }
  428. fmt.Println("tweetsToPost", tweets)
  429. return tweets
  430. }
  431. //creates a shared secret for each server
  432. func createSharedSecret() [numClients][2][32]byte {
  433. var tmpSharedSecret [numClients][2][32]byte
  434. for i := 0; i < numClients; i++ {
  435. for j := 0; j < 2; j++ {
  436. _, err := rand.Read(tmpSharedSecret[i][j][:])
  437. if err != nil {
  438. panic("couldn't get randomness for sharedSecret!")
  439. }
  440. }
  441. }
  442. return tmpSharedSecret
  443. }
  444. func createAuditPIRQuery(clientNumber int) ([]byte, []byte) {
  445. //pirQuery [serverAmount][dbWriteSize]byte
  446. pirQuerys := make([][]byte, 2)
  447. for i := range pirQuerys {
  448. pirQuerys[i] = make([]byte, dbWriteSize)
  449. }
  450. //for leader
  451. //pirQuery will be filled with random bits
  452. for index := range pirQuerys[0] {
  453. bit := mr.Intn(2)
  454. pirQuerys[0][index] = byte(bit)
  455. }
  456. copy(pirQuerys[1], pirQuerys[0])
  457. //the positon the virtual address will be taken from
  458. pos := mr.Intn(dbWriteSize)
  459. pirQuerys[0][pos] = 1
  460. pirQuerys[1][pos] = 0
  461. //flattens the querys to be able to send them more efficently
  462. messagesFlattened := make([][]byte, 2)
  463. //adds the sharedSecret to the pirQuery
  464. for server := 0; server < 2; server++ {
  465. messagesFlattened[server] = append(messagesFlattened[server], sharedSecret[clientNumber][server][:]...)
  466. }
  467. for server := 0; server < 2; server++ {
  468. messagesFlattened[server] = append(messagesFlattened[server], pirQuerys[server][:]...)
  469. }
  470. var nonce [24]byte
  471. _, err := rand.Read(nonce[:])
  472. if err != nil {
  473. panic("couldn't get randomness for nonce!")
  474. }
  475. encryptedQueryLeader := box.Seal(nonce[:], messagesFlattened[0], &nonce, leaderPublicKey, clientPrivateKey[clientNumber])
  476. _, err = rand.Read(nonce[:])
  477. if err != nil {
  478. panic("couldn't get randomness for nonce!")
  479. }
  480. encryptedQueryFollower := box.Seal(nonce[:], messagesFlattened[1], &nonce, followerPublicKey, clientPrivateKey[clientNumber])
  481. return encryptedQueryLeader, encryptedQueryFollower
  482. }
  483. //generates a topicOfInterest array with random values
  484. func addFakeInterests(max int, topicsOfInterest []int, doAuditing bool) []int {
  485. tmpNeededSubscriptions := neededSubscriptions
  486. if tmpNeededSubscriptions > len(topicList) {
  487. tmpNeededSubscriptions = len(topicList)
  488. }
  489. faketopicsOfInterest := make([]int, tmpNeededSubscriptions)
  490. maxInt := max
  491. //fills the array with unique random ascending values ranging from 0 to max
  492. for i := 0; i < tmpNeededSubscriptions; i++ {
  493. faketopicsOfInterest[i] = mr.Intn(maxInt)
  494. for j := 0; j < i; j++ {
  495. if faketopicsOfInterest[i] == faketopicsOfInterest[j] {
  496. i--
  497. break
  498. }
  499. }
  500. }
  501. if doAuditing {
  502. sort.Ints(faketopicsOfInterest)
  503. return faketopicsOfInterest
  504. }
  505. //adds unique and new random numbers to topicOfInterests until length is satisfied
  506. for _, number := range faketopicsOfInterest {
  507. if !inList(number, topicsOfInterest) {
  508. topicsOfInterest = append(topicsOfInterest, number)
  509. }
  510. if len(topicsOfInterest) == tmpNeededSubscriptions {
  511. break
  512. }
  513. }
  514. sort.Ints(topicsOfInterest)
  515. return topicsOfInterest
  516. }
  517. func inList(number int, list []int) bool {
  518. for _, element := range list {
  519. if element == number {
  520. return true
  521. }
  522. }
  523. return false
  524. }
  525. func receiveTopicLists(leaderConn net.Conn) {
  526. for i := 0; i < 2; i++ {
  527. topicListLength := readFrom(leaderConn, 4)
  528. recTopicList := readFrom(leaderConn, byteToInt(topicListLength))
  529. var tmpTopicList []string
  530. arrayReader := bytes.NewReader(recTopicList[:])
  531. json.NewDecoder(arrayReader).Decode(&tmpTopicList)
  532. if i == 0 {
  533. topicList = tmpTopicList
  534. } else {
  535. archiveTopicList = tmpTopicList
  536. }
  537. }
  538. }
  539. func SetGuiTweet(tweet string) {
  540. guiTweet = tweet
  541. }
  542. func getGuiTweet() []byte {
  543. if guiTweet == "" {
  544. tweet := make([]byte, dataLength)
  545. return tweet
  546. }
  547. var tweet []byte
  548. var topics []byte
  549. var text []byte
  550. tweetArray := strings.Split(guiTweet, "#")
  551. text = []byte(tweetArray[0])
  552. tweetArray = tweetArray[1:]
  553. for _, elem := range tweetArray {
  554. topics = append(topics, []byte(elem)[:]...)
  555. topics = append(topics, []byte(",")[0])
  556. }
  557. topics = topics[:len(topics)-1]
  558. tweet = append(tweet, topics...)
  559. tweet = append(tweet, []byte(";")[0])
  560. tweet = append(tweet, text...)
  561. tweet = append(tweet, []byte(";;")[:]...)
  562. //adds padding
  563. length := dataLength - len(tweet)
  564. padding := make([]byte, length)
  565. rand.Read(padding)
  566. tweet = append(tweet, padding...)
  567. publisherAmount++
  568. guiTweet = ""
  569. return tweet
  570. }
  571. func getRealTweet(clientNumber int) []byte {
  572. fUserList, err := os.Open("/home/simon/goCode/tweets/userList")
  573. if err != nil {
  574. panic(err)
  575. }
  576. defer fUserList.Close()
  577. currentLine := 0
  578. scanner := bufio.NewScanner(fUserList)
  579. userID := ""
  580. for scanner.Scan() {
  581. if currentLine == clientNumber {
  582. userID = scanner.Text()
  583. break
  584. }
  585. currentLine++
  586. }
  587. if userID == "" {
  588. panic("no userID picked")
  589. }
  590. fTweets, err := os.Open("/home/simon/goCode/tweets/userTweets/" + userID)
  591. if err != nil {
  592. panic(err)
  593. }
  594. defer fTweets.Close()
  595. scanner = bufio.NewScanner(fTweets)
  596. lowerBound := timeBounds[round-1]
  597. upperBound := timeBounds[round]
  598. var tweet []byte
  599. for scanner.Scan() {
  600. //skips round 1, cause of 90% publisher rate
  601. if round == 1 {
  602. break
  603. }
  604. lineArr := strings.Split(scanner.Text(), ", \"hashtags\"")
  605. lineArr = strings.Split(lineArr[0], ": ")
  606. lineArr = strings.Split(lineArr[1], " \"")
  607. timestamp, _ := strconv.Atoi(lineArr[0])
  608. //transforms timestamp to current time
  609. timestamp -= 1351742400
  610. timestamp += startTime
  611. if float64(timestamp) > lowerBound && float64(timestamp) < upperBound {
  612. lineArr = strings.Split(scanner.Text(), "[\"")
  613. line := lineArr[1]
  614. lineArr = strings.Split(line, "\"]")
  615. line = lineArr[0]
  616. lineArr = strings.Split(line, ",")
  617. line = strings.Join(lineArr, "")
  618. topicLine := strings.Split(line, "\"")
  619. var topics []byte
  620. for index, topic := range topicLine {
  621. if index%2 == 1 {
  622. continue
  623. }
  624. if len(topics)+len(topic) > dataLength-10 {
  625. break
  626. }
  627. topics = append(topics, []byte(topic)[:]...)
  628. topics = append(topics, []byte(",")[0])
  629. }
  630. if len(topics) == 0 {
  631. break
  632. }
  633. topics = topics[:len(topics)-1]
  634. tweet = append(tweet, topics...)
  635. tweet = append(tweet, []byte(";")[0])
  636. r := mr.New(mr.NewSource(time.Now().UnixNano()))
  637. num := r.Intn(10000)
  638. if num == 0 {
  639. num = 1
  640. }
  641. tweet = append(tweet, []byte(strconv.Itoa(num) + ";;")[:]...)
  642. //adds padding
  643. length := dataLength - len(tweet)
  644. padding := make([]byte, length)
  645. rand.Read(padding)
  646. tweet = append(tweet, padding...)
  647. publisherAmount++
  648. return tweet
  649. }
  650. }
  651. tweet = make([]byte, dataLength)
  652. return tweet
  653. }
  654. func getTimeBounds() {
  655. timeBounds = make([]float64, 10000)
  656. timeBounds[0] = float64(time.Now().Unix())
  657. for index := range timeBounds {
  658. if index == 0 {
  659. continue
  660. }
  661. timeBounds[index] = timeBounds[index-1] + speedUp*(3*maxTimePerRound.Seconds()+2)
  662. }
  663. }
  664. func getRandomTweet(clientNumber int) []byte {
  665. var tweet []byte
  666. r := mr.New(mr.NewSource(time.Now().UnixNano()))
  667. maxTopics := r.Intn(6)
  668. if maxTopics == 0 {
  669. maxTopics = 1
  670. }
  671. maxInt := 100
  672. topicNumbers := make([]int, maxTopics)
  673. //fills the array with unique random ascending values ranging from 0 to maxInt
  674. for i := 0; i < maxTopics; i++ {
  675. topicNumbers[i] = mr.Intn(maxInt)
  676. for j := 0; j < i; j++ {
  677. if topicNumbers[i] == topicNumbers[j] {
  678. i--
  679. break
  680. }
  681. }
  682. }
  683. sort.Ints(topicNumbers)
  684. var topics []byte
  685. topicIndex := 0
  686. for i := 0; i < len(topicNumbers)*2; i++ {
  687. if i%2 == 0 {
  688. topics = append(topics, byte(topicNumbers[topicIndex]))
  689. topicIndex++
  690. } else if i != (len(topicNumbers)*2)-1 {
  691. topics = append(topics, []byte(",")[0])
  692. }
  693. }
  694. topics = append(topics, []byte(";")[0])
  695. num := r.Intn(100)
  696. if num == 0 {
  697. num = 1
  698. }
  699. text := []byte(strconv.Itoa(num) + ";")
  700. tweet = append(tweet, topics...)
  701. tweet = append(tweet, text...)
  702. tweet = append(tweet, []byte(";")[0])
  703. //adds padding
  704. length := dataLength - len(tweet)
  705. padding := make([]byte, length)
  706. rand.Read(padding)
  707. tweet = append(tweet, padding...)
  708. return tweet
  709. }
  710. //sends the array to the connection
  711. func writeTo(connection net.Conn, array []byte) {
  712. remainingLength := len(array)
  713. for remainingLength > 0 {
  714. if remainingLength >= mtu {
  715. _, err := connection.Write(array[:mtu])
  716. if err != nil {
  717. panic(err)
  718. }
  719. array = array[mtu:]
  720. remainingLength -= mtu
  721. } else {
  722. _, err := connection.Write(array)
  723. if err != nil {
  724. panic(err)
  725. }
  726. remainingLength = 0
  727. }
  728. }
  729. }
  730. //reads an array which is returned and of size "size" from the connection
  731. func readFrom(connection net.Conn, size int) []byte {
  732. var array []byte
  733. remainingSize := size
  734. for remainingSize > 0 {
  735. var err error
  736. toAppend := make([]byte, mtu)
  737. if remainingSize > mtu {
  738. _, err = connection.Read(toAppend)
  739. array = append(array, toAppend...)
  740. remainingSize -= mtu
  741. } else {
  742. _, err = connection.Read(toAppend[:remainingSize])
  743. array = append(array, toAppend[:remainingSize]...)
  744. remainingSize = 0
  745. }
  746. if err != nil {
  747. panic(err)
  748. }
  749. }
  750. return array
  751. }
  752. func intToByte(myInt int) (retBytes []byte) {
  753. retBytes = make([]byte, 4)
  754. retBytes[3] = byte((myInt >> 24) & 0xff)
  755. retBytes[2] = byte((myInt >> 16) & 0xff)
  756. retBytes[1] = byte((myInt >> 8) & 0xff)
  757. retBytes[0] = byte(myInt & 0xff)
  758. return
  759. }
  760. func byteToInt(myBytes []byte) (x int) {
  761. x = int(myBytes[3])<<24 + int(myBytes[2])<<16 + int(myBytes[1])<<8 + int(myBytes[0])
  762. return
  763. }