client.go 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852
  1. package main
  2. /*
  3. #cgo CFLAGS: -O2
  4. #cgo LDFLAGS: -lcrypto -lm
  5. #include "../c/dpf.h"
  6. #include "../c/okvClient.h"
  7. #include "../c/dpf.c"
  8. #include "../c/okvClient.c"
  9. */
  10. import "C"
  11. //sssssssssssss
  12. import (
  13. "2PPS/lib"
  14. "bufio"
  15. "bytes"
  16. "crypto/rand"
  17. "crypto/sha256"
  18. "crypto/tls"
  19. "encoding/json"
  20. "fmt"
  21. "math/big"
  22. mr "math/rand"
  23. "net"
  24. "os"
  25. "sort"
  26. "strconv"
  27. "strings"
  28. "sync"
  29. "time"
  30. "unsafe"
  31. "golang.org/x/crypto/nacl/box"
  32. )
  33. type tweet struct {
  34. Topics []string
  35. Text string
  36. }
  37. const leader string = "127.0.0.1:4441"
  38. //needs to be changed at leader/follower/client at the same time
  39. const numClients = 500
  40. //mylimit=8000
  41. //sudo prlimit --nofile=$mylimit --pid $$; ulimit -n $mylimit
  42. const dataLength int = 256
  43. //Maximum Transport Unit
  44. const mtu int = 1100
  45. var dbWriteSize int
  46. var round int
  47. var topicList []string
  48. var archiveTopicList []string
  49. var neededSubscriptions int
  50. var publisherAmount int
  51. var timeBounds []int
  52. var speedUp int = 500
  53. var maxTimePerRound time.Duration = 10 * time.Second
  54. var startTime int
  55. //todo! expand this for multiple clients
  56. var archiveInterests = make([]int, 1)
  57. var sharedSecret [numClients][2][32]byte = createSharedSecret()
  58. var wantsArchive = make([]byte, 1)
  59. var leaderPublicKey *[32]byte
  60. var followerPublicKey *[32]byte
  61. var clientPrivateKey [numClients]*[32]byte
  62. var clientPublicKey [numClients]*[32]byte
  63. func main() {
  64. wg := &sync.WaitGroup{}
  65. getTimeBounds()
  66. for i := 0; i < numClients; i++ {
  67. wg.Add(1)
  68. go client(i)
  69. time.Sleep(1 * time.Millisecond)
  70. }
  71. wg.Wait()
  72. }
  73. func client(clientNumber int) {
  74. generatedPublicKey, generatedPrivateKey, err := box.GenerateKey(rand.Reader)
  75. if err != nil {
  76. panic(err)
  77. }
  78. clientPrivateKey[clientNumber] = generatedPrivateKey
  79. clientPublicKey[clientNumber] = generatedPublicKey
  80. C.initializeCipher()
  81. //initializes the connection to the leader
  82. conf := &tls.Config{
  83. InsecureSkipVerify: true,
  84. }
  85. leaderConn, err := tls.Dial("tcp", leader, conf)
  86. if err != nil {
  87. fmt.Println("clientNumber", clientNumber)
  88. panic(err)
  89. }
  90. leaderConn.SetDeadline(time.Time{})
  91. //receives topics first so client can participate asap
  92. receiveTopicLists(leaderConn)
  93. //gets the public keys of both servers
  94. var tmpLeaderPubKey [32]byte
  95. _, err = leaderConn.Read(tmpLeaderPubKey[:])
  96. if err != nil {
  97. panic(err)
  98. }
  99. leaderPublicKey = &tmpLeaderPubKey
  100. var tmpFollowerPubKey [32]byte
  101. _, err = leaderConn.Read(tmpFollowerPubKey[:])
  102. if err != nil {
  103. panic(err)
  104. }
  105. followerPublicKey = &tmpFollowerPubKey
  106. //sends own public key
  107. writeTo(leaderConn, clientPublicKey[clientNumber][:])
  108. neededSubscriptionsBytes := readFrom(leaderConn, 4)
  109. neededSubscriptions = byteToInt(neededSubscriptionsBytes)
  110. startTimeBytes := readFrom(leaderConn, 4)
  111. startTime = byteToInt(startTimeBytes)
  112. //setup ends above
  113. //while client is active he is always connected and has to participate
  114. for {
  115. //gets current phase
  116. phase := readFrom(leaderConn, 1)
  117. if phase[0] == 1 {
  118. //gets current dbWriteSize from leader
  119. dbWriteSizeBytes := readFrom(leaderConn, 4)
  120. dbWriteSize = byteToInt(dbWriteSizeBytes)
  121. //todo! put into tweet creation
  122. //roundAsBytes := readFrom(leaderConn, 4)
  123. roundAsBytes := make([]byte, 4)
  124. _, err = leaderConn.Read(roundAsBytes)
  125. if err != nil {
  126. panic(err)
  127. }
  128. round = byteToInt(roundAsBytes)
  129. if clientNumber == 0 {
  130. fmt.Println("Round ", round)
  131. }
  132. //request virtualAddress from leader via pirQuery
  133. encryptedQueryLeader, encryptedQueryFollower := createAuditPIRQuery(clientNumber)
  134. sendQuerys(encryptedQueryLeader, encryptedQueryFollower, leaderConn, false)
  135. pos := receiveVirtualAddress(sharedSecret[clientNumber], leaderConn)
  136. tweet := getRealTweet(clientNumber)
  137. if clientNumber == numClients-1 {
  138. fmt.Println("publisherAmount", publisherAmount)
  139. }
  140. //prep the query
  141. dataSize := len(tweet)
  142. querySize := make([]byte, 4)
  143. cQuerySize := C.int(byteToInt(querySize))
  144. var dpfQueryA *C.uchar
  145. var dpfQueryB *C.uchar
  146. C.prepQuery(C.int(pos), C.int(dbWriteSize), (*C.uchar)(&tweet[0]), C.int(dataSize), &cQuerySize, &dpfQueryA, &dpfQueryB)
  147. intQuerySize := int(cQuerySize) //byteToInt(querySize)
  148. //write the query
  149. queryAPlaintext := C.GoBytes(unsafe.Pointer(dpfQueryA), C.int(intQuerySize))
  150. //encrypts queryA and appends it to message
  151. var nonce [24]byte
  152. //fill nonce with randomness
  153. _, err = rand.Read(nonce[:])
  154. if err != nil {
  155. panic("couldn't get randomness for nonce!")
  156. }
  157. dpfQueryAEncrypted := box.Seal(nonce[:], queryAPlaintext, &nonce, leaderPublicKey, clientPrivateKey[clientNumber])
  158. //encrypts queryB and appends it to message
  159. queryBPlaintext := C.GoBytes(unsafe.Pointer(dpfQueryB), C.int(intQuerySize))
  160. //fill nonce with randomness
  161. _, err = rand.Read(nonce[:])
  162. if err != nil {
  163. panic("couldn't get randomness for nonce!")
  164. }
  165. dpfQueryBEncrypted := box.Seal(nonce[:], queryBPlaintext, &nonce, followerPublicKey, clientPrivateKey[clientNumber])
  166. //writes the dpfQuery to the leader
  167. dpfLengthBytes := intToByte(len(dpfQueryAEncrypted))
  168. writeTo(leaderConn, dpfLengthBytes)
  169. writeTo(leaderConn, dpfQueryAEncrypted)
  170. writeTo(leaderConn, dpfQueryBEncrypted)
  171. C.free(unsafe.Pointer(dpfQueryA))
  172. C.free(unsafe.Pointer(dpfQueryB))
  173. } else if phase[0] == 3 {
  174. /*
  175. possible Values
  176. 0 : new client
  177. leader expects sharedSecrets, expects pirQuery
  178. 1 : update needed
  179. leader sends topicList, performs local update of sharedSecret, expects pirQuery
  180. 2 : no update needed
  181. nothing
  182. */
  183. subPhase := readFrom(leaderConn, 1)
  184. var encryptedQueryLeader, encryptedQueryFollower []byte
  185. //first time participating
  186. if subPhase[0] == 0 {
  187. receiveTopicLists(leaderConn)
  188. encryptedQueryLeader, encryptedQueryFollower = createPIRQuery(int(subPhase[0]), clientNumber)
  189. sendQuerys(encryptedQueryLeader, encryptedQueryFollower, leaderConn, false)
  190. }
  191. //updates the topic list and what client is interested in
  192. if subPhase[0] == 1 {
  193. receiveTopicLists(leaderConn)
  194. //updates local secret
  195. for index := 0; index < 2; index++ {
  196. sharedSecret[clientNumber][index] = sha256.Sum256(sharedSecret[clientNumber][index][:])
  197. }
  198. encryptedQueryLeader, encryptedQueryFollower = createPIRQuery(int(subPhase[0]), clientNumber)
  199. sendQuerys(encryptedQueryLeader, encryptedQueryFollower, leaderConn, false)
  200. }
  201. receiveTweets(sharedSecret[clientNumber], leaderConn, false, clientNumber)
  202. if len(archiveTopicList) > 0 {
  203. wantsArchive[0] = 0 //archive test
  204. } else {
  205. wantsArchive[0] = 0
  206. }
  207. writeTo(leaderConn, wantsArchive)
  208. if wantsArchive[0] == 1 && len(archiveTopicList) > 0 {
  209. encryptedQueryLeader, encryptedQueryFollower = createPIRQuery(-1, clientNumber)
  210. sendQuerys(encryptedQueryLeader, encryptedQueryFollower, leaderConn, true)
  211. receiveTweets(sharedSecret[clientNumber], leaderConn, true, clientNumber)
  212. }
  213. } else {
  214. fmt.Println("Phase", phase)
  215. panic("somethin went wrong")
  216. }
  217. }
  218. }
  219. //creates and sends the pirQuerys for each server
  220. func createPIRQuery(subPhase int, clientNumber int) ([]byte, []byte) {
  221. //later this will be taken from gui, this is only for testing
  222. topicsOfInterest := make([]int, 1)
  223. topicsOfInterest[0] = mr.Intn(10)
  224. //todo! repeat for archive
  225. tmpNeededSubscriptions := neededSubscriptions
  226. if tmpNeededSubscriptions > len(topicList) {
  227. tmpNeededSubscriptions = len(topicList)
  228. }
  229. tmptopicsOfInterest := make([]int, len(topicsOfInterest))
  230. copy(tmptopicsOfInterest, topicsOfInterest)
  231. tmpTopicList := make([]string, len(topicList))
  232. copy(tmpTopicList, topicList)
  233. if wantsArchive[0] == 1 && subPhase == -1 {
  234. tmpNeededSubscriptions = len(archiveInterests)
  235. if tmpNeededSubscriptions > len(archiveTopicList) {
  236. tmpNeededSubscriptions = len(archiveTopicList)
  237. }
  238. copy(tmptopicsOfInterest, archiveInterests) //archiveInterests from gui
  239. copy(tmpTopicList, archiveTopicList)
  240. }
  241. //creates fake topicsOfInterest if client is boooring
  242. if len(tmptopicsOfInterest) < tmpNeededSubscriptions && subPhase != -1 {
  243. tmptopicsOfInterest = addFakeInterests(len(tmpTopicList), tmptopicsOfInterest, false)
  244. }
  245. //pirQuery [topicsOfInterest][serverAmount][topicAmount]byte
  246. pirQuerys := make([][][]byte, len(tmptopicsOfInterest))
  247. for i := range pirQuerys {
  248. pirQuerys[i] = make([][]byte, 2)
  249. for j := range pirQuerys[i] {
  250. pirQuerys[i][j] = make([]byte, len(tmpTopicList))
  251. }
  252. }
  253. //for leader
  254. //pirQuery will be filled with random bits
  255. for topic := range tmptopicsOfInterest {
  256. for index := range tmpTopicList {
  257. bit, err := rand.Int(rand.Reader, big.NewInt(2))
  258. if err != nil {
  259. panic(err)
  260. }
  261. pirQuerys[topic][0][index] = byte(bit.Int64())
  262. }
  263. }
  264. tmptopicsOfInterestBytes := make([]byte, len(tmpTopicList))
  265. for index := range tmptopicsOfInterest {
  266. if tmptopicsOfInterest[index] == 1 {
  267. tmptopicsOfInterestBytes[index] = 1
  268. }
  269. }
  270. for topicIndex, topic := range tmptopicsOfInterest {
  271. for index := range tmpTopicList {
  272. if topic == index {
  273. if pirQuerys[topicIndex][0][index] == 1 {
  274. pirQuerys[topicIndex][1][index] = 0
  275. } else {
  276. pirQuerys[topicIndex][1][index] = 1
  277. }
  278. } else {
  279. if pirQuerys[topicIndex][0][index] == 0 {
  280. pirQuerys[topicIndex][1][index] = 0
  281. } else {
  282. pirQuerys[topicIndex][1][index] = 1
  283. }
  284. }
  285. }
  286. }
  287. //flattens the querys to be able to send them more efficently
  288. messagesFlattened := make([][]byte, 2)
  289. //adds the sharedSecret to the first pirQuery when first time participating
  290. if subPhase == 0 {
  291. for server := 0; server < 2; server++ {
  292. messagesFlattened[server] = append(messagesFlattened[server], sharedSecret[clientNumber][server][:]...)
  293. }
  294. }
  295. for server := range messagesFlattened {
  296. for topic := range pirQuerys {
  297. messagesFlattened[server] = append(messagesFlattened[server], pirQuerys[topic][server]...)
  298. }
  299. }
  300. var nonce [24]byte
  301. _, err := rand.Read(nonce[:])
  302. if err != nil {
  303. panic("couldn't get randomness for nonce!")
  304. }
  305. encryptedQueryLeader := box.Seal(nonce[:], messagesFlattened[0], &nonce, leaderPublicKey, clientPrivateKey[clientNumber])
  306. _, err = rand.Read(nonce[:])
  307. if err != nil {
  308. panic("couldn't get randomness for nonce!")
  309. }
  310. encryptedQueryFollower := box.Seal(nonce[:], messagesFlattened[1], &nonce, followerPublicKey, clientPrivateKey[clientNumber])
  311. return encryptedQueryLeader, encryptedQueryFollower
  312. }
  313. func sendQuerys(encryptedQueryLeader, encryptedQueryFollower []byte, leaderConn net.Conn, getArchive bool) {
  314. encryptedLength := len(encryptedQueryLeader)
  315. //sends the pirQuerysLength to the leader
  316. writeTo(leaderConn, intToByte(encryptedLength))
  317. //sends the pirQuerys to the leader
  318. writeTo(leaderConn, encryptedQueryLeader)
  319. writeTo(leaderConn, encryptedQueryFollower)
  320. if getArchive {
  321. writeTo(leaderConn, intToByte(len(archiveInterests)))
  322. }
  323. }
  324. func receiveVirtualAddress(sharedSecret [2][32]byte, leaderConn net.Conn) int {
  325. virtualAddressByte := readFrom(leaderConn, 4)
  326. //xores the sharedSecret
  327. for h := 0; h < 2; h++ {
  328. for i := 0; i < 4; i++ {
  329. virtualAddressByte[i] = virtualAddressByte[i] ^ sharedSecret[h][i]
  330. }
  331. }
  332. return byteToInt(virtualAddressByte)
  333. }
  334. func receiveTweets(sharedSecret [2][32]byte, leaderConn net.Conn, getArchive bool, clientNumber int) {
  335. tmpNeededSubscriptions := neededSubscriptions
  336. if tmpNeededSubscriptions > len(topicList) {
  337. tmpNeededSubscriptions = len(topicList)
  338. }
  339. if getArchive {
  340. tmpNeededSubscriptions = len(archiveInterests)
  341. if tmpNeededSubscriptions > len(archiveTopicList) {
  342. tmpNeededSubscriptions = len(archiveTopicList)
  343. }
  344. }
  345. for i := 0; i < tmpNeededSubscriptions; i++ {
  346. //client receives tweets
  347. tweetsLengthBytes := readFrom(leaderConn, 4)
  348. tweetsLength := byteToInt(tweetsLengthBytes)
  349. tweets := readFrom(leaderConn, tweetsLength)
  350. //fmt.Println(tweets[:10])
  351. //expand sharedSecret so it is of right length
  352. expandBy := len(tweets) / 32
  353. expandedSharedSecrets := make([][]byte, 2)
  354. for i := 0; i < 2; i++ {
  355. for j := 0; j < expandBy; j++ {
  356. expandedSharedSecrets[i] = append(expandedSharedSecrets[i], sharedSecret[i][:]...)
  357. }
  358. }
  359. //xors the received messge into the message to display
  360. for i := 0; i < 2; i++ {
  361. lib.Xor(expandedSharedSecrets[i][:], tweets)
  362. }
  363. //fmt.Println("PubKey", clientPublicKey[clientNumber], "Bytes", tweets)
  364. index := strings.Index(string(tweets), ";;")
  365. if index != -1 && clientNumber == 0 {
  366. //fmt.Println("Correct")
  367. textArr := strings.Split(string(tweets), ";;;")
  368. text := textArr[:len(textArr)-1]
  369. fmt.Println("Round", round, text[0], "Length", len(tweets))
  370. fmt.Println("padding", text[1])
  371. /* doesnt work
  372. goodPadding := strings.Index(text[0], text[1])
  373. if text[0] == text[1] || text[1] == "" {
  374. goodPadding = -1
  375. }
  376. if goodPadding != -1 {
  377. fmt.Println("Round", round, text[0], "Length", len(tweets))
  378. fmt.Println("padding", text[1])
  379. }
  380. */
  381. } else if index == -1 {
  382. fmt.Println("error")
  383. fmt.Println("round", round, string(tweets), "length", len(tweets))
  384. return
  385. //panic("received text not of correct format")
  386. }
  387. }
  388. }
  389. //creates a shared secret for each server
  390. func createSharedSecret() [numClients][2][32]byte {
  391. var tmpSharedSecret [numClients][2][32]byte
  392. for i := 0; i < numClients; i++ {
  393. for j := 0; j < 2; j++ {
  394. _, err := rand.Read(tmpSharedSecret[i][j][:])
  395. if err != nil {
  396. panic("couldn't get randomness for sharedSecret!")
  397. }
  398. }
  399. }
  400. return tmpSharedSecret
  401. }
  402. func createAuditPIRQuery(clientNumber int) ([]byte, []byte) {
  403. //pirQuery [serverAmount][dbWriteSize]byte
  404. pirQuerys := make([][]byte, 2)
  405. for i := range pirQuerys {
  406. pirQuerys[i] = make([]byte, dbWriteSize)
  407. }
  408. //for leader
  409. //pirQuery will be filled with random bits
  410. for index := range pirQuerys[0] {
  411. bit := mr.Intn(2)
  412. pirQuerys[0][index] = byte(bit)
  413. }
  414. copy(pirQuerys[1], pirQuerys[0])
  415. //the positon the virtual address will be taken from
  416. pos := mr.Intn(dbWriteSize)
  417. pirQuerys[0][pos] = 1
  418. pirQuerys[1][pos] = 0
  419. //flattens the querys to be able to send them more efficently
  420. messagesFlattened := make([][]byte, 2)
  421. //adds the sharedSecret to the pirQuery
  422. for server := 0; server < 2; server++ {
  423. messagesFlattened[server] = append(messagesFlattened[server], sharedSecret[clientNumber][server][:]...)
  424. }
  425. for server := 0; server < 2; server++ {
  426. messagesFlattened[server] = append(messagesFlattened[server], pirQuerys[server][:]...)
  427. }
  428. var nonce [24]byte
  429. _, err := rand.Read(nonce[:])
  430. if err != nil {
  431. panic("couldn't get randomness for nonce!")
  432. }
  433. encryptedQueryLeader := box.Seal(nonce[:], messagesFlattened[0], &nonce, leaderPublicKey, clientPrivateKey[clientNumber])
  434. _, err = rand.Read(nonce[:])
  435. if err != nil {
  436. panic("couldn't get randomness for nonce!")
  437. }
  438. encryptedQueryFollower := box.Seal(nonce[:], messagesFlattened[1], &nonce, followerPublicKey, clientPrivateKey[clientNumber])
  439. return encryptedQueryLeader, encryptedQueryFollower
  440. }
  441. //generates a topicOfInterest array with random values
  442. func addFakeInterests(max int, topicsOfInterest []int, doAuditing bool) []int {
  443. tmpNeededSubscriptions := neededSubscriptions
  444. if tmpNeededSubscriptions > len(topicList) {
  445. tmpNeededSubscriptions = len(topicList)
  446. }
  447. faketopicsOfInterest := make([]int, tmpNeededSubscriptions)
  448. maxInt := max
  449. //fills the array with unique random ascending values ranging from 0 to max
  450. for i := 0; i < tmpNeededSubscriptions; i++ {
  451. faketopicsOfInterest[i] = mr.Intn(maxInt)
  452. for j := 0; j < i; j++ {
  453. if faketopicsOfInterest[i] == faketopicsOfInterest[j] {
  454. i--
  455. break
  456. }
  457. }
  458. }
  459. if doAuditing {
  460. sort.Ints(faketopicsOfInterest)
  461. return faketopicsOfInterest
  462. }
  463. //adds unique and new random numbers to topicOfInterests until length is satisfied
  464. for _, number := range faketopicsOfInterest {
  465. if !inList(number, topicsOfInterest) {
  466. topicsOfInterest = append(topicsOfInterest, number)
  467. }
  468. if len(topicsOfInterest) == tmpNeededSubscriptions {
  469. break
  470. }
  471. }
  472. sort.Ints(topicsOfInterest)
  473. return topicsOfInterest
  474. }
  475. func inList(number int, list []int) bool {
  476. for _, element := range list {
  477. if element == number {
  478. return true
  479. }
  480. }
  481. return false
  482. }
  483. func receiveTopicLists(leaderConn net.Conn) {
  484. for i := 0; i < 2; i++ {
  485. topicListLength := readFrom(leaderConn, 4)
  486. recTopicList := readFrom(leaderConn, byteToInt(topicListLength))
  487. var tmpTopicList []string
  488. arrayReader := bytes.NewReader(recTopicList[:])
  489. json.NewDecoder(arrayReader).Decode(&tmpTopicList)
  490. if i == 0 {
  491. topicList = tmpTopicList
  492. } else {
  493. archiveTopicList = tmpTopicList
  494. }
  495. }
  496. }
  497. func getRealTweet(clientNumber int) []byte {
  498. fUserList, err := os.Open("/home/simon/goCode/tweets/userList")
  499. if err != nil {
  500. panic(err)
  501. }
  502. defer fUserList.Close()
  503. currentLine := 0
  504. scanner := bufio.NewScanner(fUserList)
  505. userID := ""
  506. for scanner.Scan() {
  507. if currentLine == clientNumber {
  508. userID = scanner.Text()
  509. break
  510. }
  511. currentLine++
  512. }
  513. if userID == "" {
  514. panic("no userID picked")
  515. }
  516. fTweets, err := os.Open("/home/simon/goCode/tweets/userTweets/" + userID)
  517. if err != nil {
  518. panic(err)
  519. }
  520. defer fTweets.Close()
  521. scanner = bufio.NewScanner(fTweets)
  522. lowerBound := timeBounds[round-1]
  523. upperBound := timeBounds[round]
  524. var tweet []byte
  525. for scanner.Scan() {
  526. lineArr := strings.Split(scanner.Text(), ", \"hashtags\"")
  527. lineArr = strings.Split(lineArr[0], ": ")
  528. lineArr = strings.Split(lineArr[1], " \"")
  529. timestamp, _ := strconv.Atoi(lineArr[0])
  530. //transforms timestamp to current time
  531. timestamp -= 1351742400
  532. timestamp += startTime
  533. if timestamp > lowerBound && timestamp < upperBound {
  534. lineArr = strings.Split(scanner.Text(), "[\"")
  535. line := lineArr[1]
  536. lineArr = strings.Split(line, "\"]")
  537. line = lineArr[0]
  538. lineArr = strings.Split(line, ",")
  539. line = strings.Join(lineArr, "")
  540. topicLine := strings.Split(line, "\"")
  541. var topics []byte
  542. for index, topic := range topicLine {
  543. if index%2 == 1 {
  544. continue
  545. }
  546. if len(topics)+len(topic) > dataLength-10 {
  547. break
  548. }
  549. topics = append(topics, []byte(topic)[:]...)
  550. topics = append(topics, []byte(",")[0])
  551. }
  552. topics = topics[:len(topics)-1]
  553. //fmt.Println(string(topics))
  554. tweet = append(tweet, topics...)
  555. tweet = append(tweet, []byte(";")[0])
  556. r := mr.New(mr.NewSource(time.Now().UnixNano()))
  557. num := r.Intn(10000)
  558. if num == 0 {
  559. num = 1
  560. }
  561. tweet = append(tweet, []byte(strconv.Itoa(num) + ";;")[:]...)
  562. //fmt.Println("tweet", string(tweet))
  563. //adds padding
  564. length := dataLength - len(tweet)
  565. padding := make([]byte, length)
  566. rand.Read(padding)
  567. tweet = append(tweet, padding...)
  568. publisherAmount++
  569. return tweet
  570. }
  571. }
  572. tweet = make([]byte, dataLength)
  573. return tweet
  574. }
  575. func getTimeBounds() {
  576. timeBounds = make([]int, 10000)
  577. timeBounds[0] = int(time.Now().Unix())
  578. for index := range timeBounds {
  579. if index == 0 {
  580. continue
  581. }
  582. timeBounds[index] = timeBounds[index-1] + speedUp*(int(3*maxTimePerRound.Seconds())+2)
  583. }
  584. }
  585. func getRandomTweet(clientNumber int) []byte {
  586. var tweet []byte
  587. r := mr.New(mr.NewSource(time.Now().UnixNano()))
  588. maxTopics := r.Intn(6)
  589. if maxTopics == 0 {
  590. maxTopics = 1
  591. }
  592. maxInt := 100
  593. topicNumbers := make([]int, maxTopics)
  594. //fills the array with unique random ascending values ranging from 0 to maxInt
  595. for i := 0; i < maxTopics; i++ {
  596. topicNumbers[i] = mr.Intn(maxInt)
  597. for j := 0; j < i; j++ {
  598. if topicNumbers[i] == topicNumbers[j] {
  599. i--
  600. break
  601. }
  602. }
  603. }
  604. sort.Ints(topicNumbers)
  605. //fmt.Println("topicNumbers", topicNumbers)
  606. var topics []byte
  607. topicIndex := 0
  608. for i := 0; i < len(topicNumbers)*2; i++ {
  609. if i%2 == 0 {
  610. topics = append(topics, byte(topicNumbers[topicIndex]))
  611. topicIndex++
  612. } else if i != (len(topicNumbers)*2)-1 {
  613. topics = append(topics, []byte(",")[0])
  614. }
  615. }
  616. topics = append(topics, []byte(";")[0])
  617. num := r.Intn(100)
  618. if num == 0 {
  619. num = 1
  620. }
  621. text := []byte(strconv.Itoa(num) + ";")
  622. tweet = append(tweet, topics...)
  623. tweet = append(tweet, text...)
  624. tweet = append(tweet, []byte(";")[0])
  625. //fmt.Println("writing", string(text))
  626. //fmt.Println(topicNumbers)
  627. if len(tweet) > 32 {
  628. fmt.Println("lenlen", len(tweet))
  629. }
  630. //adds padding
  631. length := dataLength - len(tweet)
  632. padding := make([]byte, length)
  633. rand.Read(padding)
  634. tweet = append(tweet, padding...)
  635. return tweet
  636. }
  637. //sends the array to the connection
  638. func writeTo(connection net.Conn, array []byte) {
  639. remainingLength := len(array)
  640. for remainingLength > 0 {
  641. if remainingLength >= mtu {
  642. _, err := connection.Write(array[:mtu])
  643. if err != nil {
  644. panic(err)
  645. }
  646. array = array[mtu:]
  647. remainingLength -= mtu
  648. } else {
  649. _, err := connection.Write(array)
  650. if err != nil {
  651. panic(err)
  652. }
  653. remainingLength = 0
  654. }
  655. }
  656. }
  657. //reads an array which is returned and of size "size" from the connection
  658. func readFrom(connection net.Conn, size int) []byte {
  659. var array []byte
  660. remainingSize := size
  661. for remainingSize > 0 {
  662. var err error
  663. toAppend := make([]byte, mtu)
  664. if remainingSize > mtu {
  665. _, err = connection.Read(toAppend)
  666. array = append(array, toAppend...)
  667. remainingSize -= mtu
  668. } else {
  669. _, err = connection.Read(toAppend[:remainingSize])
  670. array = append(array, toAppend[:remainingSize]...)
  671. remainingSize = 0
  672. }
  673. if err != nil {
  674. panic(err)
  675. }
  676. }
  677. return array
  678. }
  679. func intToByte(myInt int) (retBytes []byte) {
  680. retBytes = make([]byte, 4)
  681. retBytes[3] = byte((myInt >> 24) & 0xff)
  682. retBytes[2] = byte((myInt >> 16) & 0xff)
  683. retBytes[1] = byte((myInt >> 8) & 0xff)
  684. retBytes[0] = byte(myInt & 0xff)
  685. return
  686. }
  687. func byteToInt(myBytes []byte) (x int) {
  688. x = int(myBytes[3])<<24 + int(myBytes[2])<<16 + int(myBytes[1])<<8 + int(myBytes[0])
  689. return
  690. }