client.go 25 KB


  1. package client
  2. /*
  3. #cgo CFLAGS: -O2
  4. #cgo LDFLAGS: -lcrypto -lm
  5. #include "../c/dpf.h"
  6. #include "../c/okvClient.h"
  7. #include "../c/dpf.c"
  8. #include "../c/okvClient.c"
  9. */
  10. import "C"
  11. import (
  12. "2PPS/lib"
  13. "bufio"
  14. "bytes"
  15. "crypto/rand"
  16. "crypto/sha256"
  17. "crypto/tls"
  18. "encoding/json"
  19. "fmt"
  20. "log"
  21. "math/big"
  22. mr "math/rand"
  23. "net"
  24. "os"
  25. "sort"
  26. "strconv"
  27. "strings"
  28. "sync"
  29. "time"
  30. "unsafe"
  31. "golang.org/x/crypto/nacl/box"
  32. )
  33. type tweet struct {
  34. Topics []string
  35. Text string
  36. }
  37. const leader string = "127.0.0.1:4441"
  38. //needs to be changed at leader/client at the same time
  39. const numClients = 1000
  40. //mylimit=8000
  41. //sudo prlimit --nofile=$mylimit --pid $$; ulimit -n $mylimit
  42. //for every terminal
  43. const dataLength int = 256
  44. //Maximum Transport Unit
  45. const mtu int = 1100
  46. var dbWriteSize int
  47. var round int
  48. var topicList []string
  49. var archiveTopicList []string
  50. var neededSubscriptions int
  51. var publisherAmount int
  52. var goodPadding int
  53. var blocksReceived int
  54. var timeBounds []float64
  55. //gui variables
  56. var guiTweet string
  57. var mainTweets string
  58. var archiveTweets string
  59. var mainInterestsString []string
  60. var archiveInterestsString []string
  61. //this translates to a simulated round length of ~2h
  62. var speedUp float64 = 7200 / ((maxTimePerRound.Seconds()) * 3)
  63. var maxTimePerRound time.Duration = 2 * time.Second
  64. var startTime int
  65. var archiveInterests = make([]int, 1)
  66. var sharedSecret [numClients][2][32]byte = createSharedSecret()
  67. var wantsArchive = make([]byte, 1)
  68. var leaderPublicKey *[32]byte
  69. var followerPublicKey *[32]byte
  70. var clientPrivateKey [numClients]*[32]byte
  71. var clientPublicKey [numClients]*[32]byte
  72. func main() {
  73. f, err := os.OpenFile("evalDataClient", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
  74. if err != nil {
  75. log.Fatalf("error opening file: %v", err)
  76. }
  77. defer f.Close()
  78. log.SetOutput(f)
  79. log.Println("simulated Duration", speedUp)
  80. wg := &sync.WaitGroup{}
  81. for i := 0; i < numClients; i++ {
  82. wg.Add(1)
  83. go Client(i, f)
  84. time.Sleep(1 * time.Millisecond)
  85. }
  86. wg.Wait()
  87. }
  88. func Client(clientNumber int, f *os.File) {
  89. if clientNumber == 0 {
  90. getTimeBounds()
  91. }
  92. generatedPublicKey, generatedPrivateKey, err := box.GenerateKey(rand.Reader)
  93. if err != nil {
  94. panic(err)
  95. }
  96. clientPrivateKey[clientNumber] = generatedPrivateKey
  97. clientPublicKey[clientNumber] = generatedPublicKey
  98. C.initializeCipher()
  99. //initializes the connection to the leader
  100. conf := &tls.Config{
  101. InsecureSkipVerify: true,
  102. }
  103. leaderConn, err := tls.Dial("tcp", leader, conf)
  104. if err != nil {
  105. panic(err)
  106. }
  107. leaderConn.SetDeadline(time.Time{})
  108. //receives topics first so client can participate asap
  109. receiveTopicLists(leaderConn)
  110. //gets the public keys of both servers
  111. var tmpLeaderPubKey [32]byte
  112. _, err = leaderConn.Read(tmpLeaderPubKey[:])
  113. if err != nil {
  114. panic(err)
  115. }
  116. leaderPublicKey = &tmpLeaderPubKey
  117. var tmpFollowerPubKey [32]byte
  118. _, err = leaderConn.Read(tmpFollowerPubKey[:])
  119. if err != nil {
  120. panic(err)
  121. }
  122. followerPublicKey = &tmpFollowerPubKey
  123. //sends own public key
  124. writeTo(leaderConn, clientPublicKey[clientNumber][:])
  125. neededSubscriptionsBytes := readFrom(leaderConn, 4)
  126. neededSubscriptions = byteToInt(neededSubscriptionsBytes)
  127. startTimeBytes := readFrom(leaderConn, 4)
  128. startTime = byteToInt(startTimeBytes)
  129. //setup ends above
  130. //while client is active he is always connected and has to participate
  131. for {
  132. //gets current phase
  133. phase := readFrom(leaderConn, 1)
  134. if phase[0] == 1 {
  135. //gets current dbWriteSize from leader
  136. dbWriteSizeBytes := readFrom(leaderConn, 4)
  137. dbWriteSize = byteToInt(dbWriteSizeBytes)
  138. //roundAsBytes := readFrom(leaderConn, 4)
  139. roundAsBytes := make([]byte, 4)
  140. _, err = leaderConn.Read(roundAsBytes)
  141. if err != nil {
  142. panic(err)
  143. }
  144. round = byteToInt(roundAsBytes)
  145. if clientNumber == 0 {
  146. fmt.Println("Round ", round)
  147. }
  148. //request virtualAddress from leader via pirQuery
  149. encryptedQueryLeader, encryptedQueryFollower := createAuditPIRQuery(clientNumber)
  150. sendQuerys(encryptedQueryLeader, encryptedQueryFollower, leaderConn, false)
  151. pos := receiveVirtualAddress(sharedSecret[clientNumber], leaderConn)
  152. tweet := getGuiTweet()
  153. //tweet := getRealTweet(clientNumber)
  154. if clientNumber == numClients-1 {
  155. log.Println("Round", round)
  156. log.Println("publisherAmount", publisherAmount)
  157. log.Println("goodPadding", goodPadding)
  158. log.Println("blocksReceived", blocksReceived)
  159. log.Println("goodPadding Percentage", float64(goodPadding)/float64(blocksReceived))
  160. publisherAmount = 0
  161. }
  162. //prep the query
  163. dataSize := len(tweet)
  164. querySize := make([]byte, 4)
  165. cQuerySize := C.int(byteToInt(querySize))
  166. var dpfQueryA *C.uchar
  167. var dpfQueryB *C.uchar
  168. C.prepQuery(C.int(pos), C.int(dbWriteSize), (*C.uchar)(&tweet[0]), C.int(dataSize), &cQuerySize, &dpfQueryA, &dpfQueryB)
  169. intQuerySize := int(cQuerySize) //byteToInt(querySize)
  170. //write the query
  171. queryAPlaintext := C.GoBytes(unsafe.Pointer(dpfQueryA), C.int(intQuerySize))
  172. //encrypts queryA and appends it to message
  173. var nonce [24]byte
  174. //fill nonce with randomness
  175. _, err = rand.Read(nonce[:])
  176. if err != nil {
  177. panic("couldn't get randomness for nonce!")
  178. }
  179. dpfQueryAEncrypted := box.Seal(nonce[:], queryAPlaintext, &nonce, leaderPublicKey, clientPrivateKey[clientNumber])
  180. //encrypts queryB and appends it to message
  181. queryBPlaintext := C.GoBytes(unsafe.Pointer(dpfQueryB), C.int(intQuerySize))
  182. //fill nonce with randomness
  183. _, err = rand.Read(nonce[:])
  184. if err != nil {
  185. panic("couldn't get randomness for nonce!")
  186. }
  187. dpfQueryBEncrypted := box.Seal(nonce[:], queryBPlaintext, &nonce, followerPublicKey, clientPrivateKey[clientNumber])
  188. //writes the dpfQuery to the leader
  189. dpfLengthBytes := intToByte(len(dpfQueryAEncrypted))
  190. writeTo(leaderConn, dpfLengthBytes)
  191. writeTo(leaderConn, dpfQueryAEncrypted)
  192. writeTo(leaderConn, dpfQueryBEncrypted)
  193. C.free(unsafe.Pointer(dpfQueryA))
  194. C.free(unsafe.Pointer(dpfQueryB))
  195. } else if phase[0] == 3 {
  196. /*
  197. possible Values
  198. 0 : new client
  199. leader expects sharedSecrets, expects pirQuery
  200. 1 : update needed
  201. leader sends topicList, performs local update of sharedSecret, expects pirQuery
  202. 2 : no update needed
  203. nothing
  204. */
  205. subPhase := readFrom(leaderConn, 1)
  206. var encryptedQueryLeader, encryptedQueryFollower []byte
  207. //first time participating
  208. if subPhase[0] == 0 {
  209. receiveTopicLists(leaderConn)
  210. encryptedQueryLeader, encryptedQueryFollower = createPIRQuery(int(subPhase[0]), clientNumber)
  211. sendQuerys(encryptedQueryLeader, encryptedQueryFollower, leaderConn, false)
  212. }
  213. //updates the topic list and what client is interested in
  214. if subPhase[0] == 1 {
  215. receiveTopicLists(leaderConn)
  216. //updates local secret
  217. for index := 0; index < 2; index++ {
  218. sharedSecret[clientNumber][index] = sha256.Sum256(sharedSecret[clientNumber][index][:])
  219. }
  220. encryptedQueryLeader, encryptedQueryFollower = createPIRQuery(int(subPhase[0]), clientNumber)
  221. sendQuerys(encryptedQueryLeader, encryptedQueryFollower, leaderConn, false)
  222. }
  223. receiveTweets(sharedSecret[clientNumber], leaderConn, false, clientNumber)
  224. if len(archiveTopicList) > 0 {
  225. wantsArchive[0] = 1
  226. } else {
  227. wantsArchive[0] = 0
  228. }
  229. writeTo(leaderConn, wantsArchive)
  230. if wantsArchive[0] == 1 && len(archiveTopicList) > 0 {
  231. encryptedQueryLeader, encryptedQueryFollower = createPIRQuery(-1, clientNumber)
  232. sendQuerys(encryptedQueryLeader, encryptedQueryFollower, leaderConn, true)
  233. receiveTweets(sharedSecret[clientNumber], leaderConn, true, clientNumber)
  234. }
  235. } else {
  236. fmt.Println("Phase", phase)
  237. panic("somethin went wrong")
  238. }
  239. }
  240. }
  241. //creates and sends the pirQuerys for each server
  242. func createPIRQuery(subPhase int, clientNumber int) ([]byte, []byte) {
  243. topicsOfInterest := interestsToInt(mainInterestsString, 0)
  244. //topicsOfInterest := make([]int, neededSubscriptions)
  245. if len(topicsOfInterest) == 0 {
  246. topicsOfInterest = make([]int, neededSubscriptions)
  247. topicsOfInterest[0] = 0
  248. }
  249. fmt.Println("tpcofinterest", topicsOfInterest)
  250. tmpNeededSubscriptions := neededSubscriptions
  251. if tmpNeededSubscriptions > len(topicList) {
  252. tmpNeededSubscriptions = len(topicList)
  253. }
  254. tmptopicsOfInterest := make([]int, len(topicsOfInterest))
  255. copy(tmptopicsOfInterest, topicsOfInterest)
  256. tmpTopicList := make([]string, len(topicList))
  257. copy(tmpTopicList, topicList)
  258. if wantsArchive[0] == 1 && subPhase == -1 {
  259. tmpNeededSubscriptions = len(archiveInterests)
  260. if tmpNeededSubscriptions > len(archiveTopicList) {
  261. tmpNeededSubscriptions = len(archiveTopicList)
  262. }
  263. tmptopicsOfInterest = interestsToInt(archiveInterestsString, 1)
  264. if len(tmptopicsOfInterest) == 0 {
  265. tmptopicsOfInterest = make([]int, neededSubscriptions)
  266. tmptopicsOfInterest[0] = 0
  267. }
  268. tmpTopicList = archiveTopicList
  269. }
  270. //creates fake topicsOfInterest if client is boooring
  271. if len(tmptopicsOfInterest) < tmpNeededSubscriptions && subPhase != -1 {
  272. tmptopicsOfInterest = addFakeInterests(len(tmpTopicList), tmptopicsOfInterest, false)
  273. }
  274. //pirQuery [topicsOfInterest][serverAmount][topicAmount]byte
  275. pirQuerys := make([][][]byte, len(tmptopicsOfInterest))
  276. for i := range pirQuerys {
  277. pirQuerys[i] = make([][]byte, 2)
  278. for j := range pirQuerys[i] {
  279. pirQuerys[i][j] = make([]byte, len(tmpTopicList))
  280. }
  281. }
  282. //for leader
  283. //pirQuery will be filled with random bits
  284. for topic := range tmptopicsOfInterest {
  285. for index := range tmpTopicList {
  286. bit, err := rand.Int(rand.Reader, big.NewInt(2))
  287. if err != nil {
  288. panic(err)
  289. }
  290. pirQuerys[topic][0][index] = byte(bit.Int64())
  291. }
  292. }
  293. tmptopicsOfInterestBytes := make([]byte, len(tmpTopicList))
  294. for index := range tmptopicsOfInterest {
  295. if tmptopicsOfInterest[index] == 1 {
  296. tmptopicsOfInterestBytes[index] = 1
  297. }
  298. }
  299. for topicIndex, topic := range tmptopicsOfInterest {
  300. for index := range tmpTopicList {
  301. if topic == index {
  302. if pirQuerys[topicIndex][0][index] == 1 {
  303. pirQuerys[topicIndex][1][index] = 0
  304. } else {
  305. pirQuerys[topicIndex][1][index] = 1
  306. }
  307. } else {
  308. if pirQuerys[topicIndex][0][index] == 0 {
  309. pirQuerys[topicIndex][1][index] = 0
  310. } else {
  311. pirQuerys[topicIndex][1][index] = 1
  312. }
  313. }
  314. }
  315. }
  316. //flattens the querys to be able to send them more efficently
  317. messagesFlattened := make([][]byte, 2)
  318. //adds the sharedSecret to the first pirQuery when first time participating
  319. if subPhase == 0 {
  320. for server := 0; server < 2; server++ {
  321. messagesFlattened[server] = append(messagesFlattened[server], sharedSecret[clientNumber][server][:]...)
  322. }
  323. }
  324. for server := range messagesFlattened {
  325. for topic := range pirQuerys {
  326. messagesFlattened[server] = append(messagesFlattened[server], pirQuerys[topic][server]...)
  327. }
  328. }
  329. var nonce [24]byte
  330. _, err := rand.Read(nonce[:])
  331. if err != nil {
  332. panic("couldn't get randomness for nonce!")
  333. }
  334. encryptedQueryLeader := box.Seal(nonce[:], messagesFlattened[0], &nonce, leaderPublicKey, clientPrivateKey[clientNumber])
  335. _, err = rand.Read(nonce[:])
  336. if err != nil {
  337. panic("couldn't get randomness for nonce!")
  338. }
  339. encryptedQueryFollower := box.Seal(nonce[:], messagesFlattened[1], &nonce, followerPublicKey, clientPrivateKey[clientNumber])
  340. return encryptedQueryLeader, encryptedQueryFollower
  341. }
  342. func sendQuerys(encryptedQueryLeader, encryptedQueryFollower []byte, leaderConn net.Conn, getArchive bool) {
  343. encryptedLength := len(encryptedQueryLeader)
  344. //sends the pirQuerysLength to the leader
  345. writeTo(leaderConn, intToByte(encryptedLength))
  346. //sends the pirQuerys to the leader
  347. writeTo(leaderConn, encryptedQueryLeader)
  348. writeTo(leaderConn, encryptedQueryFollower)
  349. if getArchive {
  350. writeTo(leaderConn, intToByte(len(archiveInterests)))
  351. }
  352. }
  353. func receiveVirtualAddress(sharedSecret [2][32]byte, leaderConn net.Conn) int {
  354. virtualAddressByte := readFrom(leaderConn, 4)
  355. //xores the sharedSecret
  356. for h := 0; h < 2; h++ {
  357. for i := 0; i < 4; i++ {
  358. virtualAddressByte[i] = virtualAddressByte[i] ^ sharedSecret[h][i]
  359. }
  360. }
  361. return byteToInt(virtualAddressByte)
  362. }
  363. func receiveTweets(sharedSecret [2][32]byte, leaderConn net.Conn, getArchive bool, clientNumber int) {
  364. tmpNeededSubscriptions := neededSubscriptions
  365. if tmpNeededSubscriptions > len(topicList) {
  366. tmpNeededSubscriptions = len(topicList)
  367. }
  368. if getArchive {
  369. tmpNeededSubscriptions = len(archiveInterests)
  370. if tmpNeededSubscriptions > len(archiveTopicList) {
  371. tmpNeededSubscriptions = len(archiveTopicList)
  372. }
  373. }
  374. for i := 0; i < tmpNeededSubscriptions; i++ {
  375. if !getArchive {
  376. blocksReceived++
  377. }
  378. //client receives tweets
  379. tweetsLengthBytes := readFrom(leaderConn, 4)
  380. tweetsLength := byteToInt(tweetsLengthBytes)
  381. tweets := readFrom(leaderConn, tweetsLength)
  382. //expand sharedSecret so it is of right length
  383. expandBy := len(tweets) / 32
  384. expandedSharedSecrets := make([][]byte, 2)
  385. for i := 0; i < 2; i++ {
  386. for j := 0; j < expandBy; j++ {
  387. expandedSharedSecrets[i] = append(expandedSharedSecrets[i], sharedSecret[i][:]...)
  388. }
  389. }
  390. //xors the received messge into the message to display
  391. for i := 0; i < 2; i++ {
  392. lib.Xor(expandedSharedSecrets[i][:], tweets)
  393. }
  394. index := strings.Index(string(tweets), ";;")
  395. if index != -1 {
  396. textArr := strings.Split(string(tweets), ";;;")
  397. text := textArr[:len(textArr)-1]
  398. if text[1] != "" {
  399. text[1] = text[1][1:]
  400. }
  401. ok := strings.Contains(text[0], text[1])
  402. if !ok {
  403. goodPadding++
  404. }
  405. fmt.Println("received", text[0])
  406. if getArchive {
  407. if !strings.Contains(archiveTweets, text[0]) {
  408. archiveTweets += text[0] + ";"
  409. }
  410. } else {
  411. if !strings.Contains(mainTweets, text[0]) {
  412. mainTweets += text[0] + ";"
  413. }
  414. }
  415. } else if index == -1 && tweets[0] != 0 {
  416. fmt.Println("error")
  417. fmt.Println("round", round, string(tweets), "length", len(tweets))
  418. return
  419. }
  420. }
  421. }
  422. func GetTweets(whereFrom int) string {
  423. var tweets string
  424. var recTweets string
  425. if whereFrom == 0 {
  426. recTweets = mainTweets
  427. mainTweets = ""
  428. } else {
  429. recTweets = archiveTweets
  430. archiveTweets = ""
  431. fmt.Println("archive", recTweets)
  432. }
  433. tweetArr := strings.Split(recTweets, ";;")
  434. for _, str := range tweetArr {
  435. strArr := strings.Split(str, ";")
  436. tweets += "<li># " + strArr[0] + "<br>" + strArr[1] + "</li>"
  437. }
  438. return tweets
  439. }
  440. func GetTopicList(whereFrom int) string {
  441. var tmpTopicList []string
  442. var list string
  443. label1 := `<label for="`
  444. label2 := `">`
  445. label3 := `</label>`
  446. var box1 string
  447. box2 := `" value="`
  448. box3 := `" type="checkbox"/>`
  449. if whereFrom == 0 {
  450. tmpTopicList = topicList
  451. box1 = `<input name="mainTopic" id="`
  452. } else {
  453. tmpTopicList = archiveTopicList
  454. box1 = `<input name="archiveTopic" id="`
  455. }
  456. for _, topic := range tmpTopicList {
  457. list += label1 + topic + label2 + topic + label3 + box1 + topic + box2 + topic + box3 + "<br>"
  458. }
  459. return list
  460. }
  461. func interestsToInt(interests []string, whereFrom int) []int {
  462. var interestsInt []int
  463. var tmpTopicList []string
  464. if whereFrom == 0 {
  465. tmpTopicList = topicList
  466. } else {
  467. tmpTopicList = archiveTopicList
  468. }
  469. for _, topic := range interests {
  470. for index, str := range tmpTopicList {
  471. if topic == str {
  472. interestsInt = append(interestsInt, index)
  473. }
  474. }
  475. }
  476. return interestsInt
  477. }
  478. func UpdateInterests(interests []string, whereTo int) {
  479. if whereTo == 0 {
  480. mainInterestsString = interests
  481. } else {
  482. archiveInterestsString = interests
  483. }
  484. }
  485. //creates a shared secret for each server
  486. func createSharedSecret() [numClients][2][32]byte {
  487. var tmpSharedSecret [numClients][2][32]byte
  488. for i := 0; i < numClients; i++ {
  489. for j := 0; j < 2; j++ {
  490. _, err := rand.Read(tmpSharedSecret[i][j][:])
  491. if err != nil {
  492. panic("couldn't get randomness for sharedSecret!")
  493. }
  494. }
  495. }
  496. return tmpSharedSecret
  497. }
  498. func createAuditPIRQuery(clientNumber int) ([]byte, []byte) {
  499. //pirQuery [serverAmount][dbWriteSize]byte
  500. pirQuerys := make([][]byte, 2)
  501. for i := range pirQuerys {
  502. pirQuerys[i] = make([]byte, dbWriteSize)
  503. }
  504. //for leader
  505. //pirQuery will be filled with random bits
  506. for index := range pirQuerys[0] {
  507. bit := mr.Intn(2)
  508. pirQuerys[0][index] = byte(bit)
  509. }
  510. copy(pirQuerys[1], pirQuerys[0])
  511. //the positon the virtual address will be taken from
  512. pos := mr.Intn(dbWriteSize)
  513. pirQuerys[0][pos] = 1
  514. pirQuerys[1][pos] = 0
  515. //flattens the querys to be able to send them more efficently
  516. messagesFlattened := make([][]byte, 2)
  517. //adds the sharedSecret to the pirQuery
  518. for server := 0; server < 2; server++ {
  519. messagesFlattened[server] = append(messagesFlattened[server], sharedSecret[clientNumber][server][:]...)
  520. }
  521. for server := 0; server < 2; server++ {
  522. messagesFlattened[server] = append(messagesFlattened[server], pirQuerys[server][:]...)
  523. }
  524. var nonce [24]byte
  525. _, err := rand.Read(nonce[:])
  526. if err != nil {
  527. panic("couldn't get randomness for nonce!")
  528. }
  529. encryptedQueryLeader := box.Seal(nonce[:], messagesFlattened[0], &nonce, leaderPublicKey, clientPrivateKey[clientNumber])
  530. _, err = rand.Read(nonce[:])
  531. if err != nil {
  532. panic("couldn't get randomness for nonce!")
  533. }
  534. encryptedQueryFollower := box.Seal(nonce[:], messagesFlattened[1], &nonce, followerPublicKey, clientPrivateKey[clientNumber])
  535. return encryptedQueryLeader, encryptedQueryFollower
  536. }
  537. //generates a topicOfInterest array with random values
  538. func addFakeInterests(max int, topicsOfInterest []int, doAuditing bool) []int {
  539. tmpNeededSubscriptions := neededSubscriptions
  540. if tmpNeededSubscriptions > len(topicList) {
  541. tmpNeededSubscriptions = len(topicList)
  542. }
  543. faketopicsOfInterest := make([]int, tmpNeededSubscriptions)
  544. maxInt := max
  545. //fills the array with unique random ascending values ranging from 0 to max
  546. for i := 0; i < tmpNeededSubscriptions; i++ {
  547. faketopicsOfInterest[i] = mr.Intn(maxInt)
  548. for j := 0; j < i; j++ {
  549. if faketopicsOfInterest[i] == faketopicsOfInterest[j] {
  550. i--
  551. break
  552. }
  553. }
  554. }
  555. if doAuditing {
  556. sort.Ints(faketopicsOfInterest)
  557. return faketopicsOfInterest
  558. }
  559. //adds unique and new random numbers to topicOfInterests until length is satisfied
  560. for _, number := range faketopicsOfInterest {
  561. if !inList(number, topicsOfInterest) {
  562. topicsOfInterest = append(topicsOfInterest, number)
  563. }
  564. if len(topicsOfInterest) == tmpNeededSubscriptions {
  565. break
  566. }
  567. }
  568. sort.Ints(topicsOfInterest)
  569. return topicsOfInterest
  570. }
  571. func inList(number int, list []int) bool {
  572. for _, element := range list {
  573. if element == number {
  574. return true
  575. }
  576. }
  577. return false
  578. }
  579. func receiveTopicLists(leaderConn net.Conn) {
  580. for i := 0; i < 2; i++ {
  581. topicListLength := readFrom(leaderConn, 4)
  582. recTopicList := readFrom(leaderConn, byteToInt(topicListLength))
  583. var tmpTopicList []string
  584. arrayReader := bytes.NewReader(recTopicList[:])
  585. json.NewDecoder(arrayReader).Decode(&tmpTopicList)
  586. if i == 0 {
  587. topicList = tmpTopicList
  588. } else {
  589. archiveTopicList = tmpTopicList
  590. }
  591. }
  592. }
  593. func SetGuiTweet(tweet string) {
  594. guiTweet = tweet
  595. }
  596. func getGuiTweet() []byte {
  597. if guiTweet == "" {
  598. tweet := make([]byte, dataLength)
  599. return tweet
  600. }
  601. var tweet []byte
  602. var topics []byte
  603. var text []byte
  604. tweetArray := strings.Split(guiTweet, "#")
  605. text = []byte(tweetArray[0])
  606. tweetArray = tweetArray[1:]
  607. for _, elem := range tweetArray {
  608. topics = append(topics, []byte(elem)[:]...)
  609. topics = append(topics, []byte(",")[0])
  610. }
  611. topics = topics[:len(topics)-1]
  612. tweet = append(tweet, topics...)
  613. tweet = append(tweet, []byte(";")[0])
  614. tweet = append(tweet, text...)
  615. tweet = append(tweet, []byte(";;")[:]...)
  616. //adds padding
  617. length := dataLength - len(tweet)
  618. padding := make([]byte, length)
  619. rand.Read(padding)
  620. tweet = append(tweet, padding...)
  621. publisherAmount++
  622. guiTweet = ""
  623. return tweet
  624. }
  625. func getRealTweet(clientNumber int) []byte {
  626. fUserList, err := os.Open("/home/simon/goCode/tweets/userList")
  627. if err != nil {
  628. panic(err)
  629. }
  630. defer fUserList.Close()
  631. currentLine := 0
  632. scanner := bufio.NewScanner(fUserList)
  633. userID := ""
  634. for scanner.Scan() {
  635. if currentLine == clientNumber {
  636. userID = scanner.Text()
  637. break
  638. }
  639. currentLine++
  640. }
  641. if userID == "" {
  642. panic("no userID picked")
  643. }
  644. fTweets, err := os.Open("/home/simon/goCode/tweets/userTweets/" + userID)
  645. if err != nil {
  646. panic(err)
  647. }
  648. defer fTweets.Close()
  649. scanner = bufio.NewScanner(fTweets)
  650. lowerBound := timeBounds[round-1]
  651. upperBound := timeBounds[round]
  652. var tweet []byte
  653. for scanner.Scan() {
  654. //skips round 1, cause of 90% publisher rate
  655. if round == 1 {
  656. break
  657. }
  658. lineArr := strings.Split(scanner.Text(), ", \"hashtags\"")
  659. lineArr = strings.Split(lineArr[0], ": ")
  660. lineArr = strings.Split(lineArr[1], " \"")
  661. timestamp, _ := strconv.Atoi(lineArr[0])
  662. //transforms timestamp to current time
  663. timestamp -= 1351742400
  664. timestamp += startTime
  665. if float64(timestamp) > lowerBound && float64(timestamp) < upperBound {
  666. lineArr = strings.Split(scanner.Text(), "[\"")
  667. line := lineArr[1]
  668. lineArr = strings.Split(line, "\"]")
  669. line = lineArr[0]
  670. lineArr = strings.Split(line, ",")
  671. line = strings.Join(lineArr, "")
  672. topicLine := strings.Split(line, "\"")
  673. var topics []byte
  674. for index, topic := range topicLine {
  675. if index%2 == 1 {
  676. continue
  677. }
  678. if len(topics)+len(topic) > dataLength-10 {
  679. break
  680. }
  681. topics = append(topics, []byte(topic)[:]...)
  682. topics = append(topics, []byte(",")[0])
  683. }
  684. if len(topics) == 0 {
  685. break
  686. }
  687. topics = topics[:len(topics)-1]
  688. tweet = append(tweet, topics...)
  689. tweet = append(tweet, []byte(";")[0])
  690. r := mr.New(mr.NewSource(time.Now().UnixNano()))
  691. num := r.Intn(10000)
  692. if num == 0 {
  693. num = 1
  694. }
  695. tweet = append(tweet, []byte(strconv.Itoa(num) + ";;")[:]...)
  696. //adds padding
  697. length := dataLength - len(tweet)
  698. padding := make([]byte, length)
  699. rand.Read(padding)
  700. tweet = append(tweet, padding...)
  701. publisherAmount++
  702. return tweet
  703. }
  704. }
  705. tweet = make([]byte, dataLength)
  706. return tweet
  707. }
  708. func getTimeBounds() {
  709. timeBounds = make([]float64, 10000)
  710. timeBounds[0] = float64(time.Now().Unix())
  711. for index := range timeBounds {
  712. if index == 0 {
  713. continue
  714. }
  715. timeBounds[index] = timeBounds[index-1] + speedUp*(3*maxTimePerRound.Seconds()+2)
  716. }
  717. }
  718. func getRandomTweet(clientNumber int) []byte {
  719. var tweet []byte
  720. r := mr.New(mr.NewSource(time.Now().UnixNano()))
  721. maxTopics := r.Intn(6)
  722. if maxTopics == 0 {
  723. maxTopics = 1
  724. }
  725. maxInt := 100
  726. topicNumbers := make([]int, maxTopics)
  727. //fills the array with unique random ascending values ranging from 0 to maxInt
  728. for i := 0; i < maxTopics; i++ {
  729. topicNumbers[i] = mr.Intn(maxInt)
  730. for j := 0; j < i; j++ {
  731. if topicNumbers[i] == topicNumbers[j] {
  732. i--
  733. break
  734. }
  735. }
  736. }
  737. sort.Ints(topicNumbers)
  738. var topics []byte
  739. topicIndex := 0
  740. for i := 0; i < len(topicNumbers)*2; i++ {
  741. if i%2 == 0 {
  742. topics = append(topics, byte(topicNumbers[topicIndex]))
  743. topicIndex++
  744. } else if i != (len(topicNumbers)*2)-1 {
  745. topics = append(topics, []byte(",")[0])
  746. }
  747. }
  748. topics = append(topics, []byte(";")[0])
  749. num := r.Intn(100)
  750. if num == 0 {
  751. num = 1
  752. }
  753. text := []byte(strconv.Itoa(num) + ";")
  754. tweet = append(tweet, topics...)
  755. tweet = append(tweet, text...)
  756. tweet = append(tweet, []byte(";")[0])
  757. //adds padding
  758. length := dataLength - len(tweet)
  759. padding := make([]byte, length)
  760. rand.Read(padding)
  761. tweet = append(tweet, padding...)
  762. return tweet
  763. }
  764. //sends the array to the connection
  765. func writeTo(connection net.Conn, array []byte) {
  766. remainingLength := len(array)
  767. for remainingLength > 0 {
  768. if remainingLength >= mtu {
  769. _, err := connection.Write(array[:mtu])
  770. if err != nil {
  771. panic(err)
  772. }
  773. array = array[mtu:]
  774. remainingLength -= mtu
  775. } else {
  776. _, err := connection.Write(array)
  777. if err != nil {
  778. panic(err)
  779. }
  780. remainingLength = 0
  781. }
  782. }
  783. }
  784. //reads an array which is returned and of size "size" from the connection
  785. func readFrom(connection net.Conn, size int) []byte {
  786. var array []byte
  787. remainingSize := size
  788. for remainingSize > 0 {
  789. var err error
  790. toAppend := make([]byte, mtu)
  791. if remainingSize > mtu {
  792. _, err = connection.Read(toAppend)
  793. array = append(array, toAppend...)
  794. remainingSize -= mtu
  795. } else {
  796. _, err = connection.Read(toAppend[:remainingSize])
  797. array = append(array, toAppend[:remainingSize]...)
  798. remainingSize = 0
  799. }
  800. if err != nil {
  801. panic(err)
  802. }
  803. }
  804. return array
  805. }
  806. func intToByte(myInt int) (retBytes []byte) {
  807. retBytes = make([]byte, 4)
  808. retBytes[3] = byte((myInt >> 24) & 0xff)
  809. retBytes[2] = byte((myInt >> 16) & 0xff)
  810. retBytes[1] = byte((myInt >> 8) & 0xff)
  811. retBytes[0] = byte(myInt & 0xff)
  812. return
  813. }
  814. func byteToInt(myBytes []byte) (x int) {
  815. x = int(myBytes[3])<<24 + int(myBytes[2])<<16 + int(myBytes[1])<<8 + int(myBytes[0])
  816. return
  817. }