client.go 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872
  1. package main
  2. /*
  3. #cgo CFLAGS: -O2
  4. #cgo LDFLAGS: -lcrypto -lm
  5. #include "../c/dpf.h"
  6. #include "../c/okvClient.h"
  7. #include "../c/dpf.c"
  8. #include "../c/okvClient.c"
  9. */
  10. import "C"
  11. import (
  12. "2PPS/lib"
  13. "bufio"
  14. "bytes"
  15. "crypto/rand"
  16. "crypto/sha256"
  17. "crypto/tls"
  18. "encoding/json"
  19. "fmt"
  20. "log"
  21. "math/big"
  22. mr "math/rand"
  23. "net"
  24. "os"
  25. "sort"
  26. "strconv"
  27. "strings"
  28. "sync"
  29. "time"
  30. "unsafe"
  31. "golang.org/x/crypto/nacl/box"
  32. )
  33. type tweet struct {
  34. Topics []string
  35. Text string
  36. }
  37. const leader string = "127.0.0.1:4441"
  38. //needs to be changed at leader/follower/client at the same time
  39. const numClients = 5000
  40. //mylimit=8000
  41. //sudo prlimit --nofile=$mylimit --pid $$; ulimit -n $mylimit
  42. //for every terminal
  43. const dataLength int = 256
  44. //Maximum Transport Unit
  45. const mtu int = 1100
  46. var dbWriteSize int
  47. var round int
  48. var topicList []string
  49. var archiveTopicList []string
  50. var neededSubscriptions int
  51. var publisherAmount int
  52. var goodPadding int
  53. var blocksReceived int
  54. var timeBounds []float64
  55. //this translates to a simulated round length of ~2h
  56. var speedUp float64 = 7200 / ((maxTimePerRound.Seconds()) * 3)
  57. var maxTimePerRound time.Duration = 200 * time.Second
  58. var startTime int
  59. var archiveInterests = make([]int, 1)
  60. var sharedSecret [numClients][2][32]byte = createSharedSecret()
  61. var wantsArchive = make([]byte, 1)
  62. var leaderPublicKey *[32]byte
  63. var followerPublicKey *[32]byte
  64. var clientPrivateKey [numClients]*[32]byte
  65. var clientPublicKey [numClients]*[32]byte
  66. func main() {
  67. f, err := os.OpenFile("evalDataClient", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
  68. if err != nil {
  69. log.Fatalf("error opening file: %v", err)
  70. }
  71. defer f.Close()
  72. log.SetOutput(f)
  73. log.Println("simulated Duration", speedUp)
  74. wg := &sync.WaitGroup{}
  75. getTimeBounds()
  76. for i := 0; i < numClients; i++ {
  77. wg.Add(1)
  78. go client(i, f)
  79. time.Sleep(1 * time.Millisecond)
  80. }
  81. wg.Wait()
  82. }
  83. func client(clientNumber int, f *os.File) {
  84. generatedPublicKey, generatedPrivateKey, err := box.GenerateKey(rand.Reader)
  85. if err != nil {
  86. panic(err)
  87. }
  88. clientPrivateKey[clientNumber] = generatedPrivateKey
  89. clientPublicKey[clientNumber] = generatedPublicKey
  90. C.initializeCipher()
  91. //initializes the connection to the leader
  92. conf := &tls.Config{
  93. InsecureSkipVerify: true,
  94. }
  95. leaderConn, err := tls.Dial("tcp", leader, conf)
  96. if err != nil {
  97. fmt.Println("clientNumber", clientNumber)
  98. panic(err)
  99. }
  100. leaderConn.SetDeadline(time.Time{})
  101. //receives topics first so client can participate asap
  102. receiveTopicLists(leaderConn)
  103. //gets the public keys of both servers
  104. var tmpLeaderPubKey [32]byte
  105. _, err = leaderConn.Read(tmpLeaderPubKey[:])
  106. if err != nil {
  107. panic(err)
  108. }
  109. leaderPublicKey = &tmpLeaderPubKey
  110. var tmpFollowerPubKey [32]byte
  111. _, err = leaderConn.Read(tmpFollowerPubKey[:])
  112. if err != nil {
  113. panic(err)
  114. }
  115. followerPublicKey = &tmpFollowerPubKey
  116. //sends own public key
  117. writeTo(leaderConn, clientPublicKey[clientNumber][:])
  118. neededSubscriptionsBytes := readFrom(leaderConn, 4)
  119. neededSubscriptions = byteToInt(neededSubscriptionsBytes)
  120. startTimeBytes := readFrom(leaderConn, 4)
  121. startTime = byteToInt(startTimeBytes)
  122. //setup ends above
  123. //while client is active he is always connected and has to participate
  124. for {
  125. //gets current phase
  126. phase := readFrom(leaderConn, 1)
  127. if phase[0] == 1 {
  128. //gets current dbWriteSize from leader
  129. dbWriteSizeBytes := readFrom(leaderConn, 4)
  130. dbWriteSize = byteToInt(dbWriteSizeBytes)
  131. //roundAsBytes := readFrom(leaderConn, 4)
  132. roundAsBytes := make([]byte, 4)
  133. _, err = leaderConn.Read(roundAsBytes)
  134. if err != nil {
  135. panic(err)
  136. }
  137. round = byteToInt(roundAsBytes)
  138. if clientNumber == 0 {
  139. fmt.Println("Round ", round)
  140. }
  141. //request virtualAddress from leader via pirQuery
  142. encryptedQueryLeader, encryptedQueryFollower := createAuditPIRQuery(clientNumber)
  143. sendQuerys(encryptedQueryLeader, encryptedQueryFollower, leaderConn, false)
  144. pos := receiveVirtualAddress(sharedSecret[clientNumber], leaderConn)
  145. tweet := getRealTweet(clientNumber)
  146. if clientNumber == numClients-1 {
  147. log.Println("Round", round)
  148. log.Println("publisherAmount", publisherAmount)
  149. log.Println("goodPadding", goodPadding)
  150. log.Println("blocksReceived", blocksReceived)
  151. log.Println("goodPadding Percentage", float64(goodPadding)/float64(blocksReceived))
  152. publisherAmount = 0
  153. }
  154. //prep the query
  155. dataSize := len(tweet)
  156. querySize := make([]byte, 4)
  157. cQuerySize := C.int(byteToInt(querySize))
  158. var dpfQueryA *C.uchar
  159. var dpfQueryB *C.uchar
  160. C.prepQuery(C.int(pos), C.int(dbWriteSize), (*C.uchar)(&tweet[0]), C.int(dataSize), &cQuerySize, &dpfQueryA, &dpfQueryB)
  161. intQuerySize := int(cQuerySize) //byteToInt(querySize)
  162. //write the query
  163. queryAPlaintext := C.GoBytes(unsafe.Pointer(dpfQueryA), C.int(intQuerySize))
  164. //encrypts queryA and appends it to message
  165. var nonce [24]byte
  166. //fill nonce with randomness
  167. _, err = rand.Read(nonce[:])
  168. if err != nil {
  169. panic("couldn't get randomness for nonce!")
  170. }
  171. dpfQueryAEncrypted := box.Seal(nonce[:], queryAPlaintext, &nonce, leaderPublicKey, clientPrivateKey[clientNumber])
  172. //encrypts queryB and appends it to message
  173. queryBPlaintext := C.GoBytes(unsafe.Pointer(dpfQueryB), C.int(intQuerySize))
  174. //fill nonce with randomness
  175. _, err = rand.Read(nonce[:])
  176. if err != nil {
  177. panic("couldn't get randomness for nonce!")
  178. }
  179. dpfQueryBEncrypted := box.Seal(nonce[:], queryBPlaintext, &nonce, followerPublicKey, clientPrivateKey[clientNumber])
  180. //writes the dpfQuery to the leader
  181. dpfLengthBytes := intToByte(len(dpfQueryAEncrypted))
  182. writeTo(leaderConn, dpfLengthBytes)
  183. writeTo(leaderConn, dpfQueryAEncrypted)
  184. writeTo(leaderConn, dpfQueryBEncrypted)
  185. C.free(unsafe.Pointer(dpfQueryA))
  186. C.free(unsafe.Pointer(dpfQueryB))
  187. } else if phase[0] == 3 {
  188. /*
  189. possible Values
  190. 0 : new client
  191. leader expects sharedSecrets, expects pirQuery
  192. 1 : update needed
  193. leader sends topicList, performs local update of sharedSecret, expects pirQuery
  194. 2 : no update needed
  195. nothing
  196. */
  197. subPhase := readFrom(leaderConn, 1)
  198. var encryptedQueryLeader, encryptedQueryFollower []byte
  199. //first time participating
  200. if subPhase[0] == 0 {
  201. receiveTopicLists(leaderConn)
  202. encryptedQueryLeader, encryptedQueryFollower = createPIRQuery(int(subPhase[0]), clientNumber)
  203. sendQuerys(encryptedQueryLeader, encryptedQueryFollower, leaderConn, false)
  204. }
  205. //updates the topic list and what client is interested in
  206. if subPhase[0] == 1 {
  207. receiveTopicLists(leaderConn)
  208. //updates local secret
  209. for index := 0; index < 2; index++ {
  210. sharedSecret[clientNumber][index] = sha256.Sum256(sharedSecret[clientNumber][index][:])
  211. }
  212. encryptedQueryLeader, encryptedQueryFollower = createPIRQuery(int(subPhase[0]), clientNumber)
  213. sendQuerys(encryptedQueryLeader, encryptedQueryFollower, leaderConn, false)
  214. }
  215. receiveTweets(sharedSecret[clientNumber], leaderConn, false, clientNumber)
  216. if len(archiveTopicList) > 0 {
  217. wantsArchive[0] = 0
  218. } else {
  219. wantsArchive[0] = 0
  220. }
  221. writeTo(leaderConn, wantsArchive)
  222. //fmt.Println("list", archiveTopicList)
  223. if wantsArchive[0] == 1 && len(archiveTopicList) > 0 {
  224. encryptedQueryLeader, encryptedQueryFollower = createPIRQuery(-1, clientNumber)
  225. sendQuerys(encryptedQueryLeader, encryptedQueryFollower, leaderConn, true)
  226. receiveTweets(sharedSecret[clientNumber], leaderConn, true, clientNumber)
  227. }
  228. } else {
  229. fmt.Println("Phase", phase)
  230. panic("somethin went wrong")
  231. }
  232. }
  233. }
  234. //creates and sends the pirQuerys for each server
  235. func createPIRQuery(subPhase int, clientNumber int) ([]byte, []byte) {
  236. //later this will be taken from gui, this is only for testing
  237. topicsOfInterest := make([]int, 1)
  238. topicsOfInterest[0] = mr.Intn(10)
  239. archiveInterests[0] = mr.Intn(10)
  240. tmpNeededSubscriptions := neededSubscriptions
  241. if tmpNeededSubscriptions > len(topicList) {
  242. tmpNeededSubscriptions = len(topicList)
  243. }
  244. tmptopicsOfInterest := make([]int, len(topicsOfInterest))
  245. copy(tmptopicsOfInterest, topicsOfInterest)
  246. tmpTopicList := make([]string, len(topicList))
  247. copy(tmpTopicList, topicList)
  248. if wantsArchive[0] == 1 && subPhase == -1 {
  249. tmpNeededSubscriptions = len(archiveInterests)
  250. if tmpNeededSubscriptions > len(archiveTopicList) {
  251. tmpNeededSubscriptions = len(archiveTopicList)
  252. }
  253. tmptopicsOfInterest = archiveInterests //!todo take archiveInterests from gui
  254. tmpTopicList = archiveTopicList
  255. }
  256. //creates fake topicsOfInterest if client is boooring
  257. if len(tmptopicsOfInterest) < tmpNeededSubscriptions && subPhase != -1 {
  258. tmptopicsOfInterest = addFakeInterests(len(tmpTopicList), tmptopicsOfInterest, false)
  259. }
  260. //pirQuery [topicsOfInterest][serverAmount][topicAmount]byte
  261. pirQuerys := make([][][]byte, len(tmptopicsOfInterest))
  262. for i := range pirQuerys {
  263. pirQuerys[i] = make([][]byte, 2)
  264. for j := range pirQuerys[i] {
  265. pirQuerys[i][j] = make([]byte, len(tmpTopicList))
  266. }
  267. }
  268. //for leader
  269. //pirQuery will be filled with random bits
  270. for topic := range tmptopicsOfInterest {
  271. for index := range tmpTopicList {
  272. bit, err := rand.Int(rand.Reader, big.NewInt(2))
  273. if err != nil {
  274. panic(err)
  275. }
  276. pirQuerys[topic][0][index] = byte(bit.Int64())
  277. }
  278. }
  279. tmptopicsOfInterestBytes := make([]byte, len(tmpTopicList))
  280. for index := range tmptopicsOfInterest {
  281. if tmptopicsOfInterest[index] == 1 {
  282. tmptopicsOfInterestBytes[index] = 1
  283. }
  284. }
  285. for topicIndex, topic := range tmptopicsOfInterest {
  286. for index := range tmpTopicList {
  287. if topic == index {
  288. if pirQuerys[topicIndex][0][index] == 1 {
  289. pirQuerys[topicIndex][1][index] = 0
  290. } else {
  291. pirQuerys[topicIndex][1][index] = 1
  292. }
  293. } else {
  294. if pirQuerys[topicIndex][0][index] == 0 {
  295. pirQuerys[topicIndex][1][index] = 0
  296. } else {
  297. pirQuerys[topicIndex][1][index] = 1
  298. }
  299. }
  300. }
  301. }
  302. //flattens the querys to be able to send them more efficently
  303. messagesFlattened := make([][]byte, 2)
  304. //adds the sharedSecret to the first pirQuery when first time participating
  305. if subPhase == 0 {
  306. for server := 0; server < 2; server++ {
  307. messagesFlattened[server] = append(messagesFlattened[server], sharedSecret[clientNumber][server][:]...)
  308. }
  309. }
  310. for server := range messagesFlattened {
  311. for topic := range pirQuerys {
  312. messagesFlattened[server] = append(messagesFlattened[server], pirQuerys[topic][server]...)
  313. }
  314. }
  315. var nonce [24]byte
  316. _, err := rand.Read(nonce[:])
  317. if err != nil {
  318. panic("couldn't get randomness for nonce!")
  319. }
  320. encryptedQueryLeader := box.Seal(nonce[:], messagesFlattened[0], &nonce, leaderPublicKey, clientPrivateKey[clientNumber])
  321. _, err = rand.Read(nonce[:])
  322. if err != nil {
  323. panic("couldn't get randomness for nonce!")
  324. }
  325. encryptedQueryFollower := box.Seal(nonce[:], messagesFlattened[1], &nonce, followerPublicKey, clientPrivateKey[clientNumber])
  326. return encryptedQueryLeader, encryptedQueryFollower
  327. }
  328. func sendQuerys(encryptedQueryLeader, encryptedQueryFollower []byte, leaderConn net.Conn, getArchive bool) {
  329. encryptedLength := len(encryptedQueryLeader)
  330. //sends the pirQuerysLength to the leader
  331. writeTo(leaderConn, intToByte(encryptedLength))
  332. //sends the pirQuerys to the leader
  333. writeTo(leaderConn, encryptedQueryLeader)
  334. writeTo(leaderConn, encryptedQueryFollower)
  335. if getArchive {
  336. writeTo(leaderConn, intToByte(len(archiveInterests)))
  337. }
  338. }
  339. func receiveVirtualAddress(sharedSecret [2][32]byte, leaderConn net.Conn) int {
  340. virtualAddressByte := readFrom(leaderConn, 4)
  341. //xores the sharedSecret
  342. for h := 0; h < 2; h++ {
  343. for i := 0; i < 4; i++ {
  344. virtualAddressByte[i] = virtualAddressByte[i] ^ sharedSecret[h][i]
  345. }
  346. }
  347. return byteToInt(virtualAddressByte)
  348. }
  349. func receiveTweets(sharedSecret [2][32]byte, leaderConn net.Conn, getArchive bool, clientNumber int) {
  350. tmpNeededSubscriptions := neededSubscriptions
  351. if tmpNeededSubscriptions > len(topicList) {
  352. tmpNeededSubscriptions = len(topicList)
  353. }
  354. if getArchive {
  355. tmpNeededSubscriptions = len(archiveInterests)
  356. if tmpNeededSubscriptions > len(archiveTopicList) {
  357. tmpNeededSubscriptions = len(archiveTopicList)
  358. }
  359. }
  360. for i := 0; i < tmpNeededSubscriptions; i++ {
  361. if !getArchive {
  362. blocksReceived++
  363. }
  364. //client receives tweets
  365. tweetsLengthBytes := readFrom(leaderConn, 4)
  366. tweetsLength := byteToInt(tweetsLengthBytes)
  367. tweets := readFrom(leaderConn, tweetsLength)
  368. //fmt.Println(tweets[:10])
  369. //expand sharedSecret so it is of right length
  370. expandBy := len(tweets) / 32
  371. expandedSharedSecrets := make([][]byte, 2)
  372. for i := 0; i < 2; i++ {
  373. for j := 0; j < expandBy; j++ {
  374. expandedSharedSecrets[i] = append(expandedSharedSecrets[i], sharedSecret[i][:]...)
  375. }
  376. }
  377. //xors the received messge into the message to display
  378. for i := 0; i < 2; i++ {
  379. lib.Xor(expandedSharedSecrets[i][:], tweets)
  380. }
  381. //fmt.Println("PubKey", clientPublicKey[clientNumber], "Bytes", tweets)
  382. index := strings.Index(string(tweets), ";;")
  383. if index != -1 {
  384. textArr := strings.Split(string(tweets), ";;;")
  385. text := textArr[:len(textArr)-1]
  386. //fmt.Println("Round", round, text[0], "Length", len(tweets))
  387. if text[1] != "" {
  388. text[1] = text[1][1:]
  389. }
  390. ok := strings.Contains(text[0], text[1])
  391. if !ok {
  392. goodPadding++
  393. //fmt.Println("Round", round, text[0], "Length", len(tweets))
  394. //fmt.Println("padding", text[1])
  395. }
  396. } else if index == -1 && tweets[0] != 0 {
  397. fmt.Println("error")
  398. fmt.Println("round", round, string(tweets), "length", len(tweets))
  399. return
  400. //panic("received text not of correct format")
  401. }
  402. }
  403. }
  404. //creates a shared secret for each server
  405. func createSharedSecret() [numClients][2][32]byte {
  406. var tmpSharedSecret [numClients][2][32]byte
  407. for i := 0; i < numClients; i++ {
  408. for j := 0; j < 2; j++ {
  409. _, err := rand.Read(tmpSharedSecret[i][j][:])
  410. if err != nil {
  411. panic("couldn't get randomness for sharedSecret!")
  412. }
  413. }
  414. }
  415. return tmpSharedSecret
  416. }
  417. func createAuditPIRQuery(clientNumber int) ([]byte, []byte) {
  418. //pirQuery [serverAmount][dbWriteSize]byte
  419. pirQuerys := make([][]byte, 2)
  420. for i := range pirQuerys {
  421. pirQuerys[i] = make([]byte, dbWriteSize)
  422. }
  423. //for leader
  424. //pirQuery will be filled with random bits
  425. for index := range pirQuerys[0] {
  426. bit := mr.Intn(2)
  427. pirQuerys[0][index] = byte(bit)
  428. }
  429. copy(pirQuerys[1], pirQuerys[0])
  430. //the positon the virtual address will be taken from
  431. pos := mr.Intn(dbWriteSize)
  432. pirQuerys[0][pos] = 1
  433. pirQuerys[1][pos] = 0
  434. //flattens the querys to be able to send them more efficently
  435. messagesFlattened := make([][]byte, 2)
  436. //adds the sharedSecret to the pirQuery
  437. for server := 0; server < 2; server++ {
  438. messagesFlattened[server] = append(messagesFlattened[server], sharedSecret[clientNumber][server][:]...)
  439. }
  440. for server := 0; server < 2; server++ {
  441. messagesFlattened[server] = append(messagesFlattened[server], pirQuerys[server][:]...)
  442. }
  443. var nonce [24]byte
  444. _, err := rand.Read(nonce[:])
  445. if err != nil {
  446. panic("couldn't get randomness for nonce!")
  447. }
  448. encryptedQueryLeader := box.Seal(nonce[:], messagesFlattened[0], &nonce, leaderPublicKey, clientPrivateKey[clientNumber])
  449. _, err = rand.Read(nonce[:])
  450. if err != nil {
  451. panic("couldn't get randomness for nonce!")
  452. }
  453. encryptedQueryFollower := box.Seal(nonce[:], messagesFlattened[1], &nonce, followerPublicKey, clientPrivateKey[clientNumber])
  454. return encryptedQueryLeader, encryptedQueryFollower
  455. }
  456. //generates a topicOfInterest array with random values
  457. func addFakeInterests(max int, topicsOfInterest []int, doAuditing bool) []int {
  458. tmpNeededSubscriptions := neededSubscriptions
  459. if tmpNeededSubscriptions > len(topicList) {
  460. tmpNeededSubscriptions = len(topicList)
  461. }
  462. faketopicsOfInterest := make([]int, tmpNeededSubscriptions)
  463. maxInt := max
  464. //fills the array with unique random ascending values ranging from 0 to max
  465. for i := 0; i < tmpNeededSubscriptions; i++ {
  466. faketopicsOfInterest[i] = mr.Intn(maxInt)
  467. for j := 0; j < i; j++ {
  468. if faketopicsOfInterest[i] == faketopicsOfInterest[j] {
  469. i--
  470. break
  471. }
  472. }
  473. }
  474. if doAuditing {
  475. sort.Ints(faketopicsOfInterest)
  476. return faketopicsOfInterest
  477. }
  478. //adds unique and new random numbers to topicOfInterests until length is satisfied
  479. for _, number := range faketopicsOfInterest {
  480. if !inList(number, topicsOfInterest) {
  481. topicsOfInterest = append(topicsOfInterest, number)
  482. }
  483. if len(topicsOfInterest) == tmpNeededSubscriptions {
  484. break
  485. }
  486. }
  487. sort.Ints(topicsOfInterest)
  488. return topicsOfInterest
  489. }
  490. func inList(number int, list []int) bool {
  491. for _, element := range list {
  492. if element == number {
  493. return true
  494. }
  495. }
  496. return false
  497. }
  498. func receiveTopicLists(leaderConn net.Conn) {
  499. for i := 0; i < 2; i++ {
  500. topicListLength := readFrom(leaderConn, 4)
  501. recTopicList := readFrom(leaderConn, byteToInt(topicListLength))
  502. var tmpTopicList []string
  503. arrayReader := bytes.NewReader(recTopicList[:])
  504. json.NewDecoder(arrayReader).Decode(&tmpTopicList)
  505. if i == 0 {
  506. topicList = tmpTopicList
  507. } else {
  508. archiveTopicList = tmpTopicList
  509. }
  510. }
  511. }
  512. func getRealTweet(clientNumber int) []byte {
  513. fUserList, err := os.Open("/home/simon/goCode/tweets/userList")
  514. if err != nil {
  515. panic(err)
  516. }
  517. defer fUserList.Close()
  518. currentLine := 0
  519. scanner := bufio.NewScanner(fUserList)
  520. userID := ""
  521. for scanner.Scan() {
  522. if currentLine == clientNumber {
  523. userID = scanner.Text()
  524. break
  525. }
  526. currentLine++
  527. }
  528. if userID == "" {
  529. panic("no userID picked")
  530. }
  531. fTweets, err := os.Open("/home/simon/goCode/tweets/userTweets/" + userID)
  532. if err != nil {
  533. panic(err)
  534. }
  535. defer fTweets.Close()
  536. scanner = bufio.NewScanner(fTweets)
  537. lowerBound := timeBounds[round-1]
  538. upperBound := timeBounds[round]
  539. var tweet []byte
  540. for scanner.Scan() {
  541. lineArr := strings.Split(scanner.Text(), ", \"hashtags\"")
  542. lineArr = strings.Split(lineArr[0], ": ")
  543. lineArr = strings.Split(lineArr[1], " \"")
  544. timestamp, _ := strconv.Atoi(lineArr[0])
  545. //transforms timestamp to current time
  546. timestamp -= 1351742400
  547. timestamp += startTime
  548. if float64(timestamp) > lowerBound && float64(timestamp) < upperBound {
  549. lineArr = strings.Split(scanner.Text(), "[\"")
  550. line := lineArr[1]
  551. lineArr = strings.Split(line, "\"]")
  552. line = lineArr[0]
  553. lineArr = strings.Split(line, ",")
  554. line = strings.Join(lineArr, "")
  555. topicLine := strings.Split(line, "\"")
  556. var topics []byte
  557. for index, topic := range topicLine {
  558. if index%2 == 1 {
  559. continue
  560. }
  561. if len(topics)+len(topic) > dataLength-10 {
  562. break
  563. }
  564. topics = append(topics, []byte(topic)[:]...)
  565. topics = append(topics, []byte(",")[0])
  566. }
  567. topics = topics[:len(topics)-1]
  568. //fmt.Println(string(topics))
  569. tweet = append(tweet, topics...)
  570. tweet = append(tweet, []byte(";")[0])
  571. r := mr.New(mr.NewSource(time.Now().UnixNano()))
  572. num := r.Intn(10000)
  573. if num == 0 {
  574. num = 1
  575. }
  576. tweet = append(tweet, []byte(strconv.Itoa(num) + ";;")[:]...)
  577. //fmt.Println("tweet", string(tweet))
  578. //adds padding
  579. length := dataLength - len(tweet)
  580. padding := make([]byte, length)
  581. rand.Read(padding)
  582. tweet = append(tweet, padding...)
  583. publisherAmount++
  584. return tweet
  585. }
  586. }
  587. tweet = make([]byte, dataLength)
  588. return tweet
  589. }
  590. func getTimeBounds() {
  591. timeBounds = make([]float64, 10000)
  592. timeBounds[0] = float64(time.Now().Unix())
  593. for index := range timeBounds {
  594. if index == 0 {
  595. continue
  596. }
  597. timeBounds[index] = timeBounds[index-1] + speedUp*(3*maxTimePerRound.Seconds()+2)
  598. }
  599. }
  600. func getRandomTweet(clientNumber int) []byte {
  601. var tweet []byte
  602. r := mr.New(mr.NewSource(time.Now().UnixNano()))
  603. maxTopics := r.Intn(6)
  604. if maxTopics == 0 {
  605. maxTopics = 1
  606. }
  607. maxInt := 100
  608. topicNumbers := make([]int, maxTopics)
  609. //fills the array with unique random ascending values ranging from 0 to maxInt
  610. for i := 0; i < maxTopics; i++ {
  611. topicNumbers[i] = mr.Intn(maxInt)
  612. for j := 0; j < i; j++ {
  613. if topicNumbers[i] == topicNumbers[j] {
  614. i--
  615. break
  616. }
  617. }
  618. }
  619. sort.Ints(topicNumbers)
  620. //fmt.Println("topicNumbers", topicNumbers)
  621. var topics []byte
  622. topicIndex := 0
  623. for i := 0; i < len(topicNumbers)*2; i++ {
  624. if i%2 == 0 {
  625. topics = append(topics, byte(topicNumbers[topicIndex]))
  626. topicIndex++
  627. } else if i != (len(topicNumbers)*2)-1 {
  628. topics = append(topics, []byte(",")[0])
  629. }
  630. }
  631. topics = append(topics, []byte(";")[0])
  632. num := r.Intn(100)
  633. if num == 0 {
  634. num = 1
  635. }
  636. text := []byte(strconv.Itoa(num) + ";")
  637. tweet = append(tweet, topics...)
  638. tweet = append(tweet, text...)
  639. tweet = append(tweet, []byte(";")[0])
  640. //fmt.Println("writing", string(text))
  641. //fmt.Println(topicNumbers)
  642. if len(tweet) > 32 {
  643. fmt.Println("lenlen", len(tweet))
  644. }
  645. //adds padding
  646. length := dataLength - len(tweet)
  647. padding := make([]byte, length)
  648. rand.Read(padding)
  649. tweet = append(tweet, padding...)
  650. return tweet
  651. }
  652. //sends the array to the connection
  653. func writeTo(connection net.Conn, array []byte) {
  654. remainingLength := len(array)
  655. for remainingLength > 0 {
  656. if remainingLength >= mtu {
  657. _, err := connection.Write(array[:mtu])
  658. if err != nil {
  659. panic(err)
  660. }
  661. array = array[mtu:]
  662. remainingLength -= mtu
  663. } else {
  664. _, err := connection.Write(array)
  665. if err != nil {
  666. panic(err)
  667. }
  668. remainingLength = 0
  669. }
  670. }
  671. }
  672. //reads an array which is returned and of size "size" from the connection
  673. func readFrom(connection net.Conn, size int) []byte {
  674. var array []byte
  675. remainingSize := size
  676. for remainingSize > 0 {
  677. var err error
  678. toAppend := make([]byte, mtu)
  679. if remainingSize > mtu {
  680. _, err = connection.Read(toAppend)
  681. array = append(array, toAppend...)
  682. remainingSize -= mtu
  683. } else {
  684. _, err = connection.Read(toAppend[:remainingSize])
  685. array = append(array, toAppend[:remainingSize]...)
  686. remainingSize = 0
  687. }
  688. if err != nil {
  689. panic(err)
  690. }
  691. }
  692. return array
  693. }
  694. func intToByte(myInt int) (retBytes []byte) {
  695. retBytes = make([]byte, 4)
  696. retBytes[3] = byte((myInt >> 24) & 0xff)
  697. retBytes[2] = byte((myInt >> 16) & 0xff)
  698. retBytes[1] = byte((myInt >> 8) & 0xff)
  699. retBytes[0] = byte(myInt & 0xff)
  700. return
  701. }
  702. func byteToInt(myBytes []byte) (x int) {
  703. x = int(myBytes[3])<<24 + int(myBytes[2])<<16 + int(myBytes[1])<<8 + int(myBytes[0])
  704. return
  705. }