client.go 23 KB


  1. package client
  2. /*
  3. #cgo CFLAGS: -O2
  4. #cgo LDFLAGS: -lcrypto -lm
  5. #include "../c/dpf.h"
  6. #include "../c/okvClient.h"
  7. #include "../c/dpf.c"
  8. #include "../c/okvClient.c"
  9. */
  10. import "C"
  11. import (
  12. "2PPS/lib"
  13. "bufio"
  14. "bytes"
  15. "crypto/rand"
  16. "crypto/sha256"
  17. "crypto/tls"
  18. "encoding/json"
  19. "fmt"
  20. "log"
  21. "math/big"
  22. mr "math/rand"
  23. "net"
  24. "os"
  25. "sort"
  26. "strconv"
  27. "strings"
  28. "sync"
  29. "time"
  30. "unsafe"
  31. "golang.org/x/crypto/nacl/box"
  32. )
  33. type tweet struct {
  34. Topics []string
  35. Text string
  36. }
  37. const leader string = "127.0.0.1:4441"
  38. //needs to be changed at leader/client at the same time
  39. const numClients = 1000
  40. //mylimit=8000
  41. //sudo prlimit --nofile=$mylimit --pid $$; ulimit -n $mylimit
  42. //for every terminal
  43. const dataLength int = 256
  44. //Maximum Transport Unit
  45. const mtu int = 1100
  46. var dbWriteSize int
  47. var round int
  48. var topicList []string
  49. var archiveTopicList []string
  50. var neededSubscriptions int
  51. var publisherAmount int
  52. var goodPadding int
  53. var blocksReceived int
  54. var timeBounds []float64
  55. var guiTweet string
  56. //this translates to a simulated round length of ~2h
  57. var speedUp float64 = 7200 / ((maxTimePerRound.Seconds()) * 3)
  58. var maxTimePerRound time.Duration = 5 * time.Second
  59. var startTime int
  60. var archiveInterests = make([]int, 1)
  61. var sharedSecret [numClients][2][32]byte = createSharedSecret()
  62. var wantsArchive = make([]byte, 1)
  63. var leaderPublicKey *[32]byte
  64. var followerPublicKey *[32]byte
  65. var clientPrivateKey [numClients]*[32]byte
  66. var clientPublicKey [numClients]*[32]byte
  67. func main() {
  68. f, err := os.OpenFile("evalDataClient", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
  69. if err != nil {
  70. log.Fatalf("error opening file: %v", err)
  71. }
  72. defer f.Close()
  73. log.SetOutput(f)
  74. log.Println("simulated Duration", speedUp)
  75. wg := &sync.WaitGroup{}
  76. for i := 0; i < numClients; i++ {
  77. wg.Add(1)
  78. go Client(i, f)
  79. time.Sleep(1 * time.Millisecond)
  80. }
  81. wg.Wait()
  82. }
  83. func Client(clientNumber int, f *os.File) {
  84. if clientNumber == 0 {
  85. getTimeBounds()
  86. }
  87. generatedPublicKey, generatedPrivateKey, err := box.GenerateKey(rand.Reader)
  88. if err != nil {
  89. panic(err)
  90. }
  91. clientPrivateKey[clientNumber] = generatedPrivateKey
  92. clientPublicKey[clientNumber] = generatedPublicKey
  93. C.initializeCipher()
  94. //initializes the connection to the leader
  95. conf := &tls.Config{
  96. InsecureSkipVerify: true,
  97. }
  98. leaderConn, err := tls.Dial("tcp", leader, conf)
  99. if err != nil {
  100. panic(err)
  101. }
  102. leaderConn.SetDeadline(time.Time{})
  103. //receives topics first so client can participate asap
  104. receiveTopicLists(leaderConn)
  105. //gets the public keys of both servers
  106. var tmpLeaderPubKey [32]byte
  107. _, err = leaderConn.Read(tmpLeaderPubKey[:])
  108. if err != nil {
  109. panic(err)
  110. }
  111. leaderPublicKey = &tmpLeaderPubKey
  112. var tmpFollowerPubKey [32]byte
  113. _, err = leaderConn.Read(tmpFollowerPubKey[:])
  114. if err != nil {
  115. panic(err)
  116. }
  117. followerPublicKey = &tmpFollowerPubKey
  118. //sends own public key
  119. writeTo(leaderConn, clientPublicKey[clientNumber][:])
  120. neededSubscriptionsBytes := readFrom(leaderConn, 4)
  121. neededSubscriptions = byteToInt(neededSubscriptionsBytes)
  122. startTimeBytes := readFrom(leaderConn, 4)
  123. startTime = byteToInt(startTimeBytes)
  124. //setup ends above
  125. //while client is active he is always connected and has to participate
  126. for {
  127. //gets current phase
  128. phase := readFrom(leaderConn, 1)
  129. if phase[0] == 1 {
  130. //gets current dbWriteSize from leader
  131. dbWriteSizeBytes := readFrom(leaderConn, 4)
  132. dbWriteSize = byteToInt(dbWriteSizeBytes)
  133. //roundAsBytes := readFrom(leaderConn, 4)
  134. roundAsBytes := make([]byte, 4)
  135. _, err = leaderConn.Read(roundAsBytes)
  136. if err != nil {
  137. panic(err)
  138. }
  139. round = byteToInt(roundAsBytes)
  140. if clientNumber == 0 {
  141. fmt.Println("Round ", round)
  142. }
  143. //request virtualAddress from leader via pirQuery
  144. encryptedQueryLeader, encryptedQueryFollower := createAuditPIRQuery(clientNumber)
  145. sendQuerys(encryptedQueryLeader, encryptedQueryFollower, leaderConn, false)
  146. pos := receiveVirtualAddress(sharedSecret[clientNumber], leaderConn)
  147. tweet := getGuiTweet()
  148. if clientNumber == numClients-1 {
  149. log.Println("Round", round)
  150. log.Println("publisherAmount", publisherAmount)
  151. log.Println("goodPadding", goodPadding)
  152. log.Println("blocksReceived", blocksReceived)
  153. log.Println("goodPadding Percentage", float64(goodPadding)/float64(blocksReceived))
  154. publisherAmount = 0
  155. }
  156. //prep the query
  157. dataSize := len(tweet)
  158. querySize := make([]byte, 4)
  159. cQuerySize := C.int(byteToInt(querySize))
  160. var dpfQueryA *C.uchar
  161. var dpfQueryB *C.uchar
  162. C.prepQuery(C.int(pos), C.int(dbWriteSize), (*C.uchar)(&tweet[0]), C.int(dataSize), &cQuerySize, &dpfQueryA, &dpfQueryB)
  163. intQuerySize := int(cQuerySize) //byteToInt(querySize)
  164. //write the query
  165. queryAPlaintext := C.GoBytes(unsafe.Pointer(dpfQueryA), C.int(intQuerySize))
  166. //encrypts queryA and appends it to message
  167. var nonce [24]byte
  168. //fill nonce with randomness
  169. _, err = rand.Read(nonce[:])
  170. if err != nil {
  171. panic("couldn't get randomness for nonce!")
  172. }
  173. dpfQueryAEncrypted := box.Seal(nonce[:], queryAPlaintext, &nonce, leaderPublicKey, clientPrivateKey[clientNumber])
  174. //encrypts queryB and appends it to message
  175. queryBPlaintext := C.GoBytes(unsafe.Pointer(dpfQueryB), C.int(intQuerySize))
  176. //fill nonce with randomness
  177. _, err = rand.Read(nonce[:])
  178. if err != nil {
  179. panic("couldn't get randomness for nonce!")
  180. }
  181. dpfQueryBEncrypted := box.Seal(nonce[:], queryBPlaintext, &nonce, followerPublicKey, clientPrivateKey[clientNumber])
  182. //writes the dpfQuery to the leader
  183. dpfLengthBytes := intToByte(len(dpfQueryAEncrypted))
  184. writeTo(leaderConn, dpfLengthBytes)
  185. writeTo(leaderConn, dpfQueryAEncrypted)
  186. writeTo(leaderConn, dpfQueryBEncrypted)
  187. C.free(unsafe.Pointer(dpfQueryA))
  188. C.free(unsafe.Pointer(dpfQueryB))
  189. } else if phase[0] == 3 {
  190. /*
  191. possible Values
  192. 0 : new client
  193. leader expects sharedSecrets, expects pirQuery
  194. 1 : update needed
  195. leader sends topicList, performs local update of sharedSecret, expects pirQuery
  196. 2 : no update needed
  197. nothing
  198. */
  199. subPhase := readFrom(leaderConn, 1)
  200. var encryptedQueryLeader, encryptedQueryFollower []byte
  201. //first time participating
  202. if subPhase[0] == 0 {
  203. receiveTopicLists(leaderConn)
  204. encryptedQueryLeader, encryptedQueryFollower = createPIRQuery(int(subPhase[0]), clientNumber)
  205. sendQuerys(encryptedQueryLeader, encryptedQueryFollower, leaderConn, false)
  206. }
  207. //updates the topic list and what client is interested in
  208. if subPhase[0] == 1 {
  209. receiveTopicLists(leaderConn)
  210. //updates local secret
  211. for index := 0; index < 2; index++ {
  212. sharedSecret[clientNumber][index] = sha256.Sum256(sharedSecret[clientNumber][index][:])
  213. }
  214. encryptedQueryLeader, encryptedQueryFollower = createPIRQuery(int(subPhase[0]), clientNumber)
  215. sendQuerys(encryptedQueryLeader, encryptedQueryFollower, leaderConn, false)
  216. }
  217. receiveTweets(sharedSecret[clientNumber], leaderConn, false, clientNumber)
  218. if len(archiveTopicList) > 0 {
  219. wantsArchive[0] = 0
  220. } else {
  221. wantsArchive[0] = 0
  222. }
  223. writeTo(leaderConn, wantsArchive)
  224. if wantsArchive[0] == 1 && len(archiveTopicList) > 0 {
  225. encryptedQueryLeader, encryptedQueryFollower = createPIRQuery(-1, clientNumber)
  226. sendQuerys(encryptedQueryLeader, encryptedQueryFollower, leaderConn, true)
  227. receiveTweets(sharedSecret[clientNumber], leaderConn, true, clientNumber)
  228. }
  229. } else {
  230. fmt.Println("Phase", phase)
  231. panic("somethin went wrong")
  232. }
  233. }
  234. }
  235. //creates and sends the pirQuerys for each server
  236. func createPIRQuery(subPhase int, clientNumber int) ([]byte, []byte) {
  237. //later this will be taken from gui, this is only for testing
  238. topicsOfInterest := make([]int, 1)
  239. topicsOfInterest[0] = mr.Intn(10)
  240. archiveInterests[0] = mr.Intn(10)
  241. tmpNeededSubscriptions := neededSubscriptions
  242. if tmpNeededSubscriptions > len(topicList) {
  243. tmpNeededSubscriptions = len(topicList)
  244. }
  245. tmptopicsOfInterest := make([]int, len(topicsOfInterest))
  246. copy(tmptopicsOfInterest, topicsOfInterest)
  247. tmpTopicList := make([]string, len(topicList))
  248. copy(tmpTopicList, topicList)
  249. if wantsArchive[0] == 1 && subPhase == -1 {
  250. tmpNeededSubscriptions = len(archiveInterests)
  251. if tmpNeededSubscriptions > len(archiveTopicList) {
  252. tmpNeededSubscriptions = len(archiveTopicList)
  253. }
  254. tmptopicsOfInterest = archiveInterests //todo! take archiveInterests from gui
  255. tmpTopicList = archiveTopicList
  256. }
  257. //creates fake topicsOfInterest if client is boooring
  258. if len(tmptopicsOfInterest) < tmpNeededSubscriptions && subPhase != -1 {
  259. tmptopicsOfInterest = addFakeInterests(len(tmpTopicList), tmptopicsOfInterest, false)
  260. }
  261. //pirQuery [topicsOfInterest][serverAmount][topicAmount]byte
  262. pirQuerys := make([][][]byte, len(tmptopicsOfInterest))
  263. for i := range pirQuerys {
  264. pirQuerys[i] = make([][]byte, 2)
  265. for j := range pirQuerys[i] {
  266. pirQuerys[i][j] = make([]byte, len(tmpTopicList))
  267. }
  268. }
  269. //for leader
  270. //pirQuery will be filled with random bits
  271. for topic := range tmptopicsOfInterest {
  272. for index := range tmpTopicList {
  273. bit, err := rand.Int(rand.Reader, big.NewInt(2))
  274. if err != nil {
  275. panic(err)
  276. }
  277. pirQuerys[topic][0][index] = byte(bit.Int64())
  278. }
  279. }
  280. tmptopicsOfInterestBytes := make([]byte, len(tmpTopicList))
  281. for index := range tmptopicsOfInterest {
  282. if tmptopicsOfInterest[index] == 1 {
  283. tmptopicsOfInterestBytes[index] = 1
  284. }
  285. }
  286. for topicIndex, topic := range tmptopicsOfInterest {
  287. for index := range tmpTopicList {
  288. if topic == index {
  289. if pirQuerys[topicIndex][0][index] == 1 {
  290. pirQuerys[topicIndex][1][index] = 0
  291. } else {
  292. pirQuerys[topicIndex][1][index] = 1
  293. }
  294. } else {
  295. if pirQuerys[topicIndex][0][index] == 0 {
  296. pirQuerys[topicIndex][1][index] = 0
  297. } else {
  298. pirQuerys[topicIndex][1][index] = 1
  299. }
  300. }
  301. }
  302. }
  303. //flattens the querys to be able to send them more efficently
  304. messagesFlattened := make([][]byte, 2)
  305. //adds the sharedSecret to the first pirQuery when first time participating
  306. if subPhase == 0 {
  307. for server := 0; server < 2; server++ {
  308. messagesFlattened[server] = append(messagesFlattened[server], sharedSecret[clientNumber][server][:]...)
  309. }
  310. }
  311. for server := range messagesFlattened {
  312. for topic := range pirQuerys {
  313. messagesFlattened[server] = append(messagesFlattened[server], pirQuerys[topic][server]...)
  314. }
  315. }
  316. var nonce [24]byte
  317. _, err := rand.Read(nonce[:])
  318. if err != nil {
  319. panic("couldn't get randomness for nonce!")
  320. }
  321. encryptedQueryLeader := box.Seal(nonce[:], messagesFlattened[0], &nonce, leaderPublicKey, clientPrivateKey[clientNumber])
  322. _, err = rand.Read(nonce[:])
  323. if err != nil {
  324. panic("couldn't get randomness for nonce!")
  325. }
  326. encryptedQueryFollower := box.Seal(nonce[:], messagesFlattened[1], &nonce, followerPublicKey, clientPrivateKey[clientNumber])
  327. return encryptedQueryLeader, encryptedQueryFollower
  328. }
  329. func sendQuerys(encryptedQueryLeader, encryptedQueryFollower []byte, leaderConn net.Conn, getArchive bool) {
  330. encryptedLength := len(encryptedQueryLeader)
  331. //sends the pirQuerysLength to the leader
  332. writeTo(leaderConn, intToByte(encryptedLength))
  333. //sends the pirQuerys to the leader
  334. writeTo(leaderConn, encryptedQueryLeader)
  335. writeTo(leaderConn, encryptedQueryFollower)
  336. if getArchive {
  337. writeTo(leaderConn, intToByte(len(archiveInterests)))
  338. }
  339. }
  340. func receiveVirtualAddress(sharedSecret [2][32]byte, leaderConn net.Conn) int {
  341. virtualAddressByte := readFrom(leaderConn, 4)
  342. //xores the sharedSecret
  343. for h := 0; h < 2; h++ {
  344. for i := 0; i < 4; i++ {
  345. virtualAddressByte[i] = virtualAddressByte[i] ^ sharedSecret[h][i]
  346. }
  347. }
  348. return byteToInt(virtualAddressByte)
  349. }
  350. func receiveTweets(sharedSecret [2][32]byte, leaderConn net.Conn, getArchive bool, clientNumber int) {
  351. tmpNeededSubscriptions := neededSubscriptions
  352. if tmpNeededSubscriptions > len(topicList) {
  353. tmpNeededSubscriptions = len(topicList)
  354. }
  355. if getArchive {
  356. tmpNeededSubscriptions = len(archiveInterests)
  357. if tmpNeededSubscriptions > len(archiveTopicList) {
  358. tmpNeededSubscriptions = len(archiveTopicList)
  359. }
  360. }
  361. for i := 0; i < tmpNeededSubscriptions; i++ {
  362. if !getArchive {
  363. blocksReceived++
  364. }
  365. //client receives tweets
  366. tweetsLengthBytes := readFrom(leaderConn, 4)
  367. tweetsLength := byteToInt(tweetsLengthBytes)
  368. tweets := readFrom(leaderConn, tweetsLength)
  369. //expand sharedSecret so it is of right length
  370. expandBy := len(tweets) / 32
  371. expandedSharedSecrets := make([][]byte, 2)
  372. for i := 0; i < 2; i++ {
  373. for j := 0; j < expandBy; j++ {
  374. expandedSharedSecrets[i] = append(expandedSharedSecrets[i], sharedSecret[i][:]...)
  375. }
  376. }
  377. //xors the received messge into the message to display
  378. for i := 0; i < 2; i++ {
  379. lib.Xor(expandedSharedSecrets[i][:], tweets)
  380. }
  381. index := strings.Index(string(tweets), ";;")
  382. if index != -1 {
  383. textArr := strings.Split(string(tweets), ";;;")
  384. text := textArr[:len(textArr)-1]
  385. if text[1] != "" {
  386. text[1] = text[1][1:]
  387. }
  388. ok := strings.Contains(text[0], text[1])
  389. if !ok {
  390. goodPadding++
  391. }
  392. } else if index == -1 && tweets[0] != 0 {
  393. fmt.Println("error")
  394. fmt.Println("round", round, string(tweets), "length", len(tweets))
  395. return
  396. }
  397. }
  398. }
  399. //creates a shared secret for each server
  400. func createSharedSecret() [numClients][2][32]byte {
  401. var tmpSharedSecret [numClients][2][32]byte
  402. for i := 0; i < numClients; i++ {
  403. for j := 0; j < 2; j++ {
  404. _, err := rand.Read(tmpSharedSecret[i][j][:])
  405. if err != nil {
  406. panic("couldn't get randomness for sharedSecret!")
  407. }
  408. }
  409. }
  410. return tmpSharedSecret
  411. }
  412. func createAuditPIRQuery(clientNumber int) ([]byte, []byte) {
  413. //pirQuery [serverAmount][dbWriteSize]byte
  414. pirQuerys := make([][]byte, 2)
  415. for i := range pirQuerys {
  416. pirQuerys[i] = make([]byte, dbWriteSize)
  417. }
  418. //for leader
  419. //pirQuery will be filled with random bits
  420. for index := range pirQuerys[0] {
  421. bit := mr.Intn(2)
  422. pirQuerys[0][index] = byte(bit)
  423. }
  424. copy(pirQuerys[1], pirQuerys[0])
  425. //the positon the virtual address will be taken from
  426. pos := mr.Intn(dbWriteSize)
  427. pirQuerys[0][pos] = 1
  428. pirQuerys[1][pos] = 0
  429. //flattens the querys to be able to send them more efficently
  430. messagesFlattened := make([][]byte, 2)
  431. //adds the sharedSecret to the pirQuery
  432. for server := 0; server < 2; server++ {
  433. messagesFlattened[server] = append(messagesFlattened[server], sharedSecret[clientNumber][server][:]...)
  434. }
  435. for server := 0; server < 2; server++ {
  436. messagesFlattened[server] = append(messagesFlattened[server], pirQuerys[server][:]...)
  437. }
  438. var nonce [24]byte
  439. _, err := rand.Read(nonce[:])
  440. if err != nil {
  441. panic("couldn't get randomness for nonce!")
  442. }
  443. encryptedQueryLeader := box.Seal(nonce[:], messagesFlattened[0], &nonce, leaderPublicKey, clientPrivateKey[clientNumber])
  444. _, err = rand.Read(nonce[:])
  445. if err != nil {
  446. panic("couldn't get randomness for nonce!")
  447. }
  448. encryptedQueryFollower := box.Seal(nonce[:], messagesFlattened[1], &nonce, followerPublicKey, clientPrivateKey[clientNumber])
  449. return encryptedQueryLeader, encryptedQueryFollower
  450. }
  451. //generates a topicOfInterest array with random values
  452. func addFakeInterests(max int, topicsOfInterest []int, doAuditing bool) []int {
  453. tmpNeededSubscriptions := neededSubscriptions
  454. if tmpNeededSubscriptions > len(topicList) {
  455. tmpNeededSubscriptions = len(topicList)
  456. }
  457. faketopicsOfInterest := make([]int, tmpNeededSubscriptions)
  458. maxInt := max
  459. //fills the array with unique random ascending values ranging from 0 to max
  460. for i := 0; i < tmpNeededSubscriptions; i++ {
  461. faketopicsOfInterest[i] = mr.Intn(maxInt)
  462. for j := 0; j < i; j++ {
  463. if faketopicsOfInterest[i] == faketopicsOfInterest[j] {
  464. i--
  465. break
  466. }
  467. }
  468. }
  469. if doAuditing {
  470. sort.Ints(faketopicsOfInterest)
  471. return faketopicsOfInterest
  472. }
  473. //adds unique and new random numbers to topicOfInterests until length is satisfied
  474. for _, number := range faketopicsOfInterest {
  475. if !inList(number, topicsOfInterest) {
  476. topicsOfInterest = append(topicsOfInterest, number)
  477. }
  478. if len(topicsOfInterest) == tmpNeededSubscriptions {
  479. break
  480. }
  481. }
  482. sort.Ints(topicsOfInterest)
  483. return topicsOfInterest
  484. }
  485. func inList(number int, list []int) bool {
  486. for _, element := range list {
  487. if element == number {
  488. return true
  489. }
  490. }
  491. return false
  492. }
  493. func receiveTopicLists(leaderConn net.Conn) {
  494. for i := 0; i < 2; i++ {
  495. topicListLength := readFrom(leaderConn, 4)
  496. recTopicList := readFrom(leaderConn, byteToInt(topicListLength))
  497. var tmpTopicList []string
  498. arrayReader := bytes.NewReader(recTopicList[:])
  499. json.NewDecoder(arrayReader).Decode(&tmpTopicList)
  500. if i == 0 {
  501. topicList = tmpTopicList
  502. } else {
  503. archiveTopicList = tmpTopicList
  504. }
  505. }
  506. }
  507. func SetGuiTweet(tweet string) {
  508. guiTweet = tweet
  509. }
  510. func getGuiTweet() []byte {
  511. if guiTweet == "" {
  512. tweet := make([]byte, dataLength)
  513. return tweet
  514. }
  515. var tweet []byte
  516. var topics []byte
  517. var text []byte
  518. tweetArray := strings.Split(guiTweet, "#")
  519. text = []byte(tweetArray[0])
  520. tweetArray = tweetArray[1:]
  521. for _, elem := range tweetArray {
  522. topics = append(topics, []byte(elem)[:]...)
  523. topics = append(topics, []byte(",")[0])
  524. }
  525. topics = topics[:len(topics)-1]
  526. tweet = append(tweet, topics...)
  527. tweet = append(tweet, []byte(";")[0])
  528. tweet = append(tweet, text...)
  529. tweet = append(tweet, []byte(";;")[:]...)
  530. //adds padding
  531. length := dataLength - len(tweet)
  532. padding := make([]byte, length)
  533. rand.Read(padding)
  534. tweet = append(tweet, padding...)
  535. publisherAmount++
  536. guiTweet = ""
  537. return tweet
  538. }
  539. func getRealTweet(clientNumber int) []byte {
  540. fUserList, err := os.Open("/home/simon/goCode/tweets/userList")
  541. if err != nil {
  542. panic(err)
  543. }
  544. defer fUserList.Close()
  545. currentLine := 0
  546. scanner := bufio.NewScanner(fUserList)
  547. userID := ""
  548. for scanner.Scan() {
  549. if currentLine == clientNumber {
  550. userID = scanner.Text()
  551. break
  552. }
  553. currentLine++
  554. }
  555. if userID == "" {
  556. panic("no userID picked")
  557. }
  558. fTweets, err := os.Open("/home/simon/goCode/tweets/userTweets/" + userID)
  559. if err != nil {
  560. panic(err)
  561. }
  562. defer fTweets.Close()
  563. scanner = bufio.NewScanner(fTweets)
  564. lowerBound := timeBounds[round-1]
  565. upperBound := timeBounds[round]
  566. var tweet []byte
  567. for scanner.Scan() {
  568. //skips round 1, cause of 90% publisher rate
  569. if round == 1 {
  570. break
  571. }
  572. lineArr := strings.Split(scanner.Text(), ", \"hashtags\"")
  573. lineArr = strings.Split(lineArr[0], ": ")
  574. lineArr = strings.Split(lineArr[1], " \"")
  575. timestamp, _ := strconv.Atoi(lineArr[0])
  576. //transforms timestamp to current time
  577. timestamp -= 1351742400
  578. timestamp += startTime
  579. if float64(timestamp) > lowerBound && float64(timestamp) < upperBound {
  580. lineArr = strings.Split(scanner.Text(), "[\"")
  581. line := lineArr[1]
  582. lineArr = strings.Split(line, "\"]")
  583. line = lineArr[0]
  584. lineArr = strings.Split(line, ",")
  585. line = strings.Join(lineArr, "")
  586. topicLine := strings.Split(line, "\"")
  587. var topics []byte
  588. for index, topic := range topicLine {
  589. if index%2 == 1 {
  590. continue
  591. }
  592. if len(topics)+len(topic) > dataLength-10 {
  593. break
  594. }
  595. topics = append(topics, []byte(topic)[:]...)
  596. topics = append(topics, []byte(",")[0])
  597. }
  598. if len(topics) == 0 {
  599. break
  600. }
  601. topics = topics[:len(topics)-1]
  602. tweet = append(tweet, topics...)
  603. tweet = append(tweet, []byte(";")[0])
  604. r := mr.New(mr.NewSource(time.Now().UnixNano()))
  605. num := r.Intn(10000)
  606. if num == 0 {
  607. num = 1
  608. }
  609. tweet = append(tweet, []byte(strconv.Itoa(num) + ";;")[:]...)
  610. //adds padding
  611. length := dataLength - len(tweet)
  612. padding := make([]byte, length)
  613. rand.Read(padding)
  614. tweet = append(tweet, padding...)
  615. publisherAmount++
  616. return tweet
  617. }
  618. }
  619. tweet = make([]byte, dataLength)
  620. return tweet
  621. }
  622. func getTimeBounds() {
  623. timeBounds = make([]float64, 10000)
  624. timeBounds[0] = float64(time.Now().Unix())
  625. for index := range timeBounds {
  626. if index == 0 {
  627. continue
  628. }
  629. timeBounds[index] = timeBounds[index-1] + speedUp*(3*maxTimePerRound.Seconds()+2)
  630. }
  631. }
  632. func getRandomTweet(clientNumber int) []byte {
  633. var tweet []byte
  634. r := mr.New(mr.NewSource(time.Now().UnixNano()))
  635. maxTopics := r.Intn(6)
  636. if maxTopics == 0 {
  637. maxTopics = 1
  638. }
  639. maxInt := 100
  640. topicNumbers := make([]int, maxTopics)
  641. //fills the array with unique random ascending values ranging from 0 to maxInt
  642. for i := 0; i < maxTopics; i++ {
  643. topicNumbers[i] = mr.Intn(maxInt)
  644. for j := 0; j < i; j++ {
  645. if topicNumbers[i] == topicNumbers[j] {
  646. i--
  647. break
  648. }
  649. }
  650. }
  651. sort.Ints(topicNumbers)
  652. var topics []byte
  653. topicIndex := 0
  654. for i := 0; i < len(topicNumbers)*2; i++ {
  655. if i%2 == 0 {
  656. topics = append(topics, byte(topicNumbers[topicIndex]))
  657. topicIndex++
  658. } else if i != (len(topicNumbers)*2)-1 {
  659. topics = append(topics, []byte(",")[0])
  660. }
  661. }
  662. topics = append(topics, []byte(";")[0])
  663. num := r.Intn(100)
  664. if num == 0 {
  665. num = 1
  666. }
  667. text := []byte(strconv.Itoa(num) + ";")
  668. tweet = append(tweet, topics...)
  669. tweet = append(tweet, text...)
  670. tweet = append(tweet, []byte(";")[0])
  671. //adds padding
  672. length := dataLength - len(tweet)
  673. padding := make([]byte, length)
  674. rand.Read(padding)
  675. tweet = append(tweet, padding...)
  676. return tweet
  677. }
  678. //sends the array to the connection
  679. func writeTo(connection net.Conn, array []byte) {
  680. remainingLength := len(array)
  681. for remainingLength > 0 {
  682. if remainingLength >= mtu {
  683. _, err := connection.Write(array[:mtu])
  684. if err != nil {
  685. panic(err)
  686. }
  687. array = array[mtu:]
  688. remainingLength -= mtu
  689. } else {
  690. _, err := connection.Write(array)
  691. if err != nil {
  692. panic(err)
  693. }
  694. remainingLength = 0
  695. }
  696. }
  697. }
  698. //reads an array which is returned and of size "size" from the connection
  699. func readFrom(connection net.Conn, size int) []byte {
  700. var array []byte
  701. remainingSize := size
  702. for remainingSize > 0 {
  703. var err error
  704. toAppend := make([]byte, mtu)
  705. if remainingSize > mtu {
  706. _, err = connection.Read(toAppend)
  707. array = append(array, toAppend...)
  708. remainingSize -= mtu
  709. } else {
  710. _, err = connection.Read(toAppend[:remainingSize])
  711. array = append(array, toAppend[:remainingSize]...)
  712. remainingSize = 0
  713. }
  714. if err != nil {
  715. panic(err)
  716. }
  717. }
  718. return array
  719. }
  720. func intToByte(myInt int) (retBytes []byte) {
  721. retBytes = make([]byte, 4)
  722. retBytes[3] = byte((myInt >> 24) & 0xff)
  723. retBytes[2] = byte((myInt >> 16) & 0xff)
  724. retBytes[1] = byte((myInt >> 8) & 0xff)
  725. retBytes[0] = byte(myInt & 0xff)
  726. return
  727. }
  728. func byteToInt(myBytes []byte) (x int) {
  729. x = int(myBytes[3])<<24 + int(myBytes[2])<<16 + int(myBytes[1])<<8 + int(myBytes[0])
  730. return
  731. }