leader.go 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431
  1. package main
  2. //#cgo CFLAGS: -fopenmp -O2
  3. //#cgo LDFLAGS: -lcrypto -lm -fopenmp
  4. //#include "../c/dpf.h"
  5. //#include "../c/okv.h"
  6. //#include "../c/dpf.c"
  7. //#include "../c/okv.c"
  8. import "C"
  9. import (
  10. "crypto/rand"
  11. "crypto/rsa"
  12. "crypto/sha256"
  13. "crypto/tls"
  14. "crypto/x509"
  15. "crypto/x509/pkix"
  16. "encoding/pem"
  17. "fmt"
  18. "log"
  19. "math"
  20. "math/big"
  21. mr "math/rand"
  22. "net"
  23. "os"
  24. "sort"
  25. "strings"
  26. "sync"
  27. "time"
  28. lib "2PPS/lib"
  29. "unsafe"
  30. "golang.org/x/crypto/nacl/box"
  31. )
  32. //this stores all neccessary information for each client
  33. type clientKeys struct {
  34. roundsParticipating int
  35. PublicKey *[32]byte
  36. SharedSecret [32]byte
  37. PirQuery [][]byte
  38. }
  39. var clientData = make(map[net.Addr]clientKeys)
  40. const follower string = "127.0.0.1:4442"
  41. //Maximum Transport Unit
  42. const mtu int = 1100
  43. var leaderPrivateKey *[32]byte
  44. var leaderPublicKey *[32]byte
  45. var followerPublicKey *[32]byte
  46. const maxNumberOfClients = 1000000
  47. var topicList []byte
  48. var topicAmount int
  49. var archiveTopicAmount int
  50. // every roundsBeforeUpdate the client updates his pirQuery
  51. const roundsBeforeUpdate = 1
  52. const neededSubscriptions = 1
  53. const numThreads = 12
  54. const dataLength = 256
  55. const numClients = 1000
  56. const minDBWriteSize = numClients * 0.5
  57. //riposte says 19.5
  58. const dbSizeFactor = 5
  59. const publisherGuess = 0.2
  60. var dbWriteSize float64 = numClients * dbSizeFactor * publisherGuess
  61. //this is the number of positions for auditing
  62. var extraPositions int = 10
  63. var collisionCounter []float64
  64. var clientsConnected float64
  65. var maxTimePerRound time.Duration = 3 * time.Second
  66. //counts the number of rounds
  67. var round int
  68. var clientsServedPhase1 []int
  69. var clientsServedPhase3 []int
  70. var startPhase1 time.Time
  71. var startPhase2 time.Time
  72. var startPhase3 time.Time
  73. //only prints auditing time for one client per round
  74. var firstAuditPrint bool
  75. var auditingStart time.Time
  76. var auditingEnd time.Time
  77. var startTime time.Time
  78. var startTimeRound time.Time
  79. //channel for goroutine communication with clients
  80. var phase1Channel = make(chan net.Conn, maxNumberOfClients)
  81. var phase3Channel = make(chan net.Conn, maxNumberOfClients)
  82. //variables for calculating the dbWrite size
  83. const publisherRounds int = 10
  84. var publisherAmount float64
  85. var publisherHistory [publisherRounds]int
  86. var firstTweetSend bool
  87. func main() {
  88. f, err := os.OpenFile("evalData", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
  89. if err != nil {
  90. log.Fatalf("error opening file: %v", err)
  91. }
  92. defer f.Close()
  93. log.SetOutput(f)
  94. log.Println("roundsBeforeUpdate", roundsBeforeUpdate)
  95. log.Println("neededSubscriptions", neededSubscriptions)
  96. log.Println("dataLength", dataLength)
  97. log.Println("maxTimePerRound~", maxTimePerRound*3)
  98. log.Println("extraPositions", extraPositions)
  99. log.Println("numClients", numClients)
  100. log.Println("Archiving is done every 24h, no Clients wants Archive")
  101. //prevents race conditions for wrtiting
  102. m := &sync.RWMutex{}
  103. startTime = time.Now()
  104. clientsServedPhase1 = make([]int, 1000)
  105. clientsServedPhase3 = make([]int, 1000)
  106. generatedPublicKey, generatedPrivateKey, err := box.GenerateKey(rand.Reader)
  107. if err != nil {
  108. panic(err)
  109. }
  110. //why is this neccessary?
  111. leaderPrivateKey = generatedPrivateKey
  112. leaderPublicKey = generatedPublicKey
  113. C.initializeServer(C.int(numThreads))
  114. //calls follower for setup
  115. conf := &tls.Config{
  116. InsecureSkipVerify: true,
  117. }
  118. followerConnection, err := tls.Dial("tcp", follower, conf)
  119. if err != nil {
  120. panic(err)
  121. }
  122. followerConnection.SetDeadline(time.Time{})
  123. //receives follower publicKey
  124. var tmpFollowerPubKey [32]byte
  125. _, err = followerConnection.Read(tmpFollowerPubKey[:])
  126. if err != nil {
  127. panic(err)
  128. }
  129. followerPublicKey = &tmpFollowerPubKey
  130. //send publicKey to follower
  131. writeTo(followerConnection, leaderPublicKey[:])
  132. writeTo(followerConnection, intToByte(neededSubscriptions))
  133. writeTo(followerConnection, intToByte(int(dbWriteSize)))
  134. //goroutine for accepting new clients
  135. go func() {
  136. leaderConnectionPrivateKey, err := rsa.GenerateKey(rand.Reader, 2048)
  137. if err != nil {
  138. panic(err)
  139. }
  140. // Generate a pem block with the private key
  141. keyPem := pem.EncodeToMemory(&pem.Block{
  142. Type: "RSA PRIVATE KEY",
  143. Bytes: x509.MarshalPKCS1PrivateKey(leaderConnectionPrivateKey),
  144. })
  145. tml := x509.Certificate{
  146. // you can add any attr that you need
  147. NotBefore: time.Now(),
  148. NotAfter: time.Now().AddDate(5, 0, 0),
  149. // you have to generate a different serial number each execution
  150. SerialNumber: big.NewInt(123123),
  151. Subject: pkix.Name{
  152. CommonName: "New Name",
  153. Organization: []string{"New Org."},
  154. },
  155. BasicConstraintsValid: true,
  156. }
  157. cert, err := x509.CreateCertificate(rand.Reader, &tml, &tml, &leaderConnectionPrivateKey.PublicKey, leaderConnectionPrivateKey)
  158. if err != nil {
  159. panic(err)
  160. }
  161. // Generate a pem block with the certificate
  162. certPem := pem.EncodeToMemory(&pem.Block{
  163. Type: "CERTIFICATE",
  164. Bytes: cert,
  165. })
  166. tlsCert, err := tls.X509KeyPair(certPem, keyPem)
  167. if err != nil {
  168. panic(err)
  169. }
  170. config := &tls.Config{Certificates: []tls.Certificate{tlsCert}}
  171. //listens for clients
  172. lnClients, err := tls.Listen("tcp", ":4441", config)
  173. if err != nil {
  174. panic(err)
  175. }
  176. defer lnClients.Close()
  177. for {
  178. clientConnection, err := lnClients.Accept()
  179. if err != nil {
  180. fmt.Println("Client connection error 1", err)
  181. clientConnection.Close()
  182. break
  183. }
  184. clientConnection.SetDeadline(time.Time{})
  185. //sends topicList so client can participate in phase 3 asap
  186. errorBool := sendTopicLists(clientConnection, followerConnection, true)
  187. if errorBool {
  188. break
  189. }
  190. //send leader publicKey
  191. _, err = clientConnection.Write(leaderPublicKey[:])
  192. if err != nil {
  193. fmt.Println("Client connection error 2", err)
  194. clientConnection.Close()
  195. break
  196. }
  197. //send follower publicKey
  198. _, err = clientConnection.Write(followerPublicKey[:])
  199. if err != nil {
  200. fmt.Println("Client connection error 3", err)
  201. clientConnection.Close()
  202. break
  203. }
  204. var clientPublicKey *[32]byte
  205. var tmpClientPublicKey [32]byte
  206. //gets publicKey from client
  207. _, err = clientConnection.Read(tmpClientPublicKey[:])
  208. if err != nil {
  209. fmt.Println("Client connection error 4", err)
  210. clientConnection.Close()
  211. break
  212. }
  213. _, err = clientConnection.Write(intToByte(neededSubscriptions))
  214. if err != nil {
  215. fmt.Println("Client connection error 5", err)
  216. clientConnection.Close()
  217. break
  218. }
  219. _, err = clientConnection.Write(intToByte(int(startTime.Unix())))
  220. if err != nil {
  221. fmt.Println("Client connection error 6", err)
  222. clientConnection.Close()
  223. break
  224. }
  225. clientPublicKey = &tmpClientPublicKey
  226. //this is the key for map(client data)
  227. remoteAddress := clientConnection.RemoteAddr()
  228. //pirQuery will be added in phase 3
  229. //bs! only want to set roundsParticipating and answerAmount to 0, mb there is a better way
  230. //will work for now
  231. var emptyArray [32]byte
  232. var emptyByteArray [][]byte
  233. keys := clientKeys{0, clientPublicKey, emptyArray, emptyByteArray}
  234. m.Lock()
  235. clientData[remoteAddress] = keys
  236. m.Unlock()
  237. phase1Channel <- clientConnection
  238. clientsConnected++
  239. }
  240. }()
  241. wg := &sync.WaitGroup{}
  242. //the current phase
  243. phase := make([]byte, 1)
  244. for {
  245. startPhase1 = time.Now()
  246. startTimeRound = time.Now()
  247. firstAuditPrint = true
  248. phase[0] = 1
  249. round++
  250. fmt.Println("Round", round)
  251. fmt.Println("clientsServedPhase1", clientsServedPhase1[round-1])
  252. fmt.Println("clientsServedPhase3", clientsServedPhase3[round-1])
  253. log.Println()
  254. log.Println("Phase 1 Round", round)
  255. log.Println()
  256. bytesSaved := lib.GetBytesSaved()
  257. log.Println("bytesSaved Percentage", bytesSaved)
  258. //creates a new write Db for this round
  259. for i := 0; i < int(dbWriteSize); i++ {
  260. C.createDb(C.int(1), C.int(dataLength))
  261. }
  262. //creates a new db containing virtual addresses for auditing
  263. virtualAddresses := createVirtualAddresses()
  264. //send all virtualAddresses to follower
  265. for i := 0; i <= int(dbWriteSize); i++ {
  266. writeTo(followerConnection, intToByte(virtualAddresses[i]))
  267. }
  268. //moves all clients to phase1
  269. if len(phase3Channel) > 0 {
  270. for client := range phase3Channel {
  271. phase1Channel <- client
  272. if len(phase3Channel) == 0 {
  273. break
  274. }
  275. }
  276. }
  277. for id := 0; id < numThreads; id++ {
  278. wg.Add(1)
  279. followerConnection, err := tls.Dial("tcp", follower, conf)
  280. if err != nil {
  281. panic(err)
  282. }
  283. followerConnection.SetDeadline(time.Time{})
  284. go phase1(id, phase, followerConnection, wg, m, virtualAddresses)
  285. }
  286. wg.Wait()
  287. log.Println("fullDurationPhase1", time.Since(startPhase1).Seconds())
  288. log.Println("fullAuditingDuration~", auditingEnd.Sub(auditingStart).Seconds()*clientsConnected)
  289. log.Println("auditingPercentage", (auditingEnd.Sub(auditingStart).Seconds()*clientsConnected)/(time.Since(startPhase1).Seconds()))
  290. //Phase 2
  291. startPhase2 = time.Now()
  292. followerConnection, err := tls.Dial("tcp", follower, conf)
  293. if err != nil {
  294. panic(err)
  295. }
  296. followerConnection.SetDeadline(time.Time{})
  297. phase2(followerConnection)
  298. log.Println("fullDurationPhase2", time.Since(startPhase2).Seconds())
  299. //Phase 3
  300. //moves all clients to phase3
  301. if len(phase1Channel) > 0 {
  302. for client := range phase1Channel {
  303. phase3Channel <- client
  304. if len(phase1Channel) == 0 {
  305. break
  306. }
  307. }
  308. }
  309. startPhase3 = time.Now()
  310. //no tweets -> continue to phase 1 and mb get tweets
  311. topicList, topicAmount = lib.GetTopicList(0)
  312. if len(topicList) == 0 {
  313. continue
  314. }
  315. firstTweetSend = true
  316. phase[0] = 3
  317. startTimeRound = time.Now()
  318. for id := 0; id < numThreads; id++ {
  319. wg.Add(1)
  320. followerConnection, err := tls.Dial("tcp", follower, conf)
  321. if err != nil {
  322. panic(err)
  323. }
  324. followerConnection.SetDeadline(time.Time{})
  325. go phase3(id, phase, followerConnection, wg, m)
  326. }
  327. wg.Wait()
  328. log.Println("fullDurationPhase3", time.Since(startPhase3).Seconds())
  329. lib.CleanUpdbR(round)
  330. }
  331. }
  332. func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGroup, m *sync.RWMutex, virtualAddresses []int) {
  333. roundAsBytes := intToByte(round)
  334. gotClient := make([]byte, 1)
  335. gotClient[0] = 0
  336. //wait until time is up
  337. for len(phase1Channel) == 0 {
  338. if time.Since(startTimeRound) > maxTimePerRound {
  339. //tells follower that this worker is done
  340. writeTo(followerConnection, gotClient)
  341. wg.Done()
  342. return
  343. }
  344. time.Sleep(1 * time.Second)
  345. }
  346. for clientConnection := range phase1Channel {
  347. clientsServedPhase1[round] = clientsServedPhase1[round] + 1
  348. gotClient[0] = 1
  349. //tells follower that this worker got a clientConnection
  350. writeTo(followerConnection, gotClient)
  351. //sends clients publicKey to follower
  352. m.RLock()
  353. clientPublicKey := clientData[clientConnection.RemoteAddr()].PublicKey
  354. m.RUnlock()
  355. writeTo(followerConnection, clientPublicKey[:])
  356. //setup the worker-specific db
  357. dbSize := int(C.dbSize)
  358. db := make([][]byte, dbSize)
  359. for i := 0; i < dbSize; i++ {
  360. db[i] = make([]byte, int(C.db[i].dataSize))
  361. }
  362. //tells client that phase 1 has begun
  363. errorBool := writeToWError(clientConnection, phase, followerConnection, 5)
  364. if errorBool {
  365. contBool := handleClientDC(wg, followerConnection, phase1Channel)
  366. if contBool {
  367. continue
  368. } else {
  369. return
  370. }
  371. }
  372. //tells client current dbWriteSize
  373. errorBool = writeToWError(clientConnection, intToByte(int(dbWriteSize)), followerConnection, 5)
  374. if errorBool {
  375. contBool := handleClientDC(wg, followerConnection, phase1Channel)
  376. if contBool {
  377. continue
  378. } else {
  379. return
  380. }
  381. }
  382. //tells client current round
  383. errorBool = writeToWError(clientConnection, roundAsBytes, followerConnection, 5)
  384. if errorBool {
  385. contBool := handleClientDC(wg, followerConnection, phase1Channel)
  386. if contBool {
  387. continue
  388. } else {
  389. return
  390. }
  391. }
  392. //begin auditing
  393. if id == 0 && firstAuditPrint {
  394. auditingStart = time.Now()
  395. }
  396. m.RLock()
  397. var clientKeys = clientData[clientConnection.RemoteAddr()]
  398. m.RUnlock()
  399. clientKeys, pirQuery, errorBool := handlePirQuery(clientKeys, clientConnection, followerConnection, 0, true)
  400. if errorBool {
  401. contBool := handleClientDC(wg, followerConnection, phase1Channel)
  402. if contBool {
  403. continue
  404. } else {
  405. return
  406. }
  407. }
  408. errorBool = getSendVirtualAddress(pirQuery[0], virtualAddresses, clientKeys.SharedSecret, clientConnection, followerConnection)
  409. if errorBool {
  410. contBool := handleClientDC(wg, followerConnection, phase1Channel)
  411. if contBool {
  412. continue
  413. } else {
  414. return
  415. }
  416. }
  417. if id == 0 && firstAuditPrint {
  418. firstAuditPrint = false
  419. auditingEnd = time.Now()
  420. log.Println("Auditing duration", time.Since(auditingStart).Seconds(), "numVirtualAddresses", len(virtualAddresses))
  421. }
  422. m.Lock()
  423. clientData[clientConnection.RemoteAddr()] = clientKeys
  424. m.Unlock()
  425. //accept dpfQuery from client
  426. dpfLengthBytes, errorBool := readFrom(clientConnection, 4, followerConnection, 5)
  427. if errorBool {
  428. contBool := handleClientDC(wg, followerConnection, phase1Channel)
  429. if contBool {
  430. continue
  431. } else {
  432. return
  433. }
  434. }
  435. dpfLength := byteToInt(dpfLengthBytes)
  436. dpfQueryAEncrypted, errorBool := readFrom(clientConnection, dpfLength, followerConnection, 5)
  437. if errorBool {
  438. contBool := handleClientDC(wg, followerConnection, phase1Channel)
  439. if contBool {
  440. continue
  441. } else {
  442. return
  443. }
  444. }
  445. dpfQueryBEncrypted, errorBool := readFrom(clientConnection, dpfLength, followerConnection, 5)
  446. if errorBool {
  447. contBool := handleClientDC(wg, followerConnection, phase1Channel)
  448. if contBool {
  449. continue
  450. } else {
  451. return
  452. }
  453. }
  454. writeToWError(followerConnection, dpfLengthBytes, nil, 0)
  455. writeToWError(followerConnection, dpfQueryBEncrypted, nil, 0)
  456. //decrypt dpfQueryA for sorting into db
  457. var decryptNonce [24]byte
  458. copy(decryptNonce[:], dpfQueryAEncrypted[:24])
  459. dpfQueryA, ok := box.Open(nil, dpfQueryAEncrypted[24:], &decryptNonce, clientPublicKey, leaderPrivateKey)
  460. if !ok {
  461. panic("dpfQueryA decryption not ok")
  462. }
  463. ds := int(C.db[0].dataSize)
  464. dataShareLeader := make([]byte, ds)
  465. pos := C.getUint128_t(C.int(virtualAddresses[int(dbWriteSize)]))
  466. C.evalDPF(C.ctx[id], (*C.uchar)(&dpfQueryA[0]), pos, C.int(ds), (*C.uchar)(&dataShareLeader[0]))
  467. dataShareFollower, _ := readFrom(followerConnection, ds, nil, 0)
  468. writeToWError(followerConnection, dataShareLeader, nil, 0)
  469. auditXOR := make([]byte, ds)
  470. passedAudit := true
  471. for i := 0; i < ds; i++ {
  472. auditXOR[i] = dataShareLeader[i] ^ dataShareFollower[i]
  473. //client tried to write to a position that is not a virtuallAddress
  474. if auditXOR[i] != 0 {
  475. clientConnection.Close()
  476. passedAudit = false
  477. }
  478. }
  479. if passedAudit {
  480. //run dpf, xor into local db
  481. for i := 0; i < dbSize; i++ {
  482. ds := int(C.db[i].dataSize)
  483. dataShare := make([]byte, ds)
  484. pos := C.getUint128_t(C.int(virtualAddresses[i]))
  485. C.evalDPF(C.ctx[id], (*C.uchar)(&dpfQueryA[0]), pos, C.int(ds), (*C.uchar)(&dataShare[0]))
  486. for j := 0; j < ds; j++ {
  487. db[i][j] = db[i][j] ^ dataShare[j]
  488. }
  489. }
  490. //xor the worker's DB into the main DB
  491. for i := 0; i < dbSize; i++ {
  492. m.Lock()
  493. C.xorIn(C.int(i), (*C.uchar)(&db[i][0]))
  494. m.Unlock()
  495. }
  496. phase3Channel <- clientConnection
  497. }
  498. //loop that waits for new client or leaves phase1 if time is up
  499. for {
  500. if time.Since(startTimeRound) < maxTimePerRound || len(phase1Channel) > 0 {
  501. //this worker handles the next client
  502. if len(phase1Channel) > 0 {
  503. break
  504. //this worker waits for next client
  505. } else {
  506. time.Sleep(1 * time.Second)
  507. }
  508. //times up
  509. } else {
  510. //tells follower that this worker is done
  511. gotClient[0] = 0
  512. writeTo(followerConnection, gotClient)
  513. wg.Done()
  514. return
  515. }
  516. }
  517. }
  518. }
  519. func phase2(followerConnection net.Conn) {
  520. //gets current seed
  521. seedLeader := make([]byte, 16)
  522. C.readSeed((*C.uchar)(&seedLeader[0]))
  523. //get data
  524. dbSize := int(C.dbSize)
  525. tmpdbLeader := make([][]byte, dbSize)
  526. for i := range tmpdbLeader {
  527. tmpdbLeader[i] = make([]byte, dataLength)
  528. }
  529. for i := 0; i < dbSize; i++ {
  530. C.readData(C.int(i), (*C.uchar)(&tmpdbLeader[i][0]))
  531. }
  532. //writes seed to follower
  533. writeTo(followerConnection, seedLeader)
  534. //write data to follower
  535. //this is surely inefficent
  536. for i := 0; i < dbSize; i++ {
  537. writeTo(followerConnection, tmpdbLeader[i])
  538. }
  539. //receive seed from follower
  540. seedFollower, _ := readFrom(followerConnection, 16, nil, 0)
  541. //receive data from follower
  542. tmpdbFollower := make([][]byte, dbSize)
  543. for i := range tmpdbFollower {
  544. tmpdbFollower[i] = make([]byte, dataLength)
  545. }
  546. for i := 0; i < dbSize; i++ {
  547. tmpdbFollower[i], _ = readFrom(followerConnection, dataLength, nil, 0)
  548. }
  549. //put together the db
  550. tmpdb := make([][]byte, dbSize)
  551. for i := range tmpdb {
  552. tmpdb[i] = make([]byte, dataLength)
  553. }
  554. //get own Ciphers
  555. ciphersLeader := make([]*C.uchar, dbSize)
  556. for i := 0; i < dbSize; i++ {
  557. ciphersLeader[i] = (*C.uchar)(C.malloc(16))
  558. }
  559. for i := 0; i < dbSize; i++ {
  560. C.getCipher(1, C.int(i), ciphersLeader[i])
  561. }
  562. //send own Ciphers to follower
  563. for i := 0; i < dbSize; i++ {
  564. writeTo(followerConnection, C.GoBytes(unsafe.Pointer(ciphersLeader[i]), 16))
  565. }
  566. //receive ciphers from follower
  567. ciphersFollower := make([]byte, dbSize*16)
  568. for i := 0; i < dbSize; i++ {
  569. _, err := followerConnection.Read(ciphersFollower[i*16:])
  570. if err != nil {
  571. panic(err)
  572. }
  573. }
  574. //put in ciphers from follower
  575. for i := 0; i < dbSize; i++ {
  576. C.putCipher(1, C.int(i), (*C.uchar)(&ciphersFollower[i*16]))
  577. }
  578. //decrypt each row
  579. for i := 0; i < dbSize; i++ {
  580. C.decryptRow(C.int(i), (*C.uchar)(&tmpdb[i][0]), (*C.uchar)(&tmpdbLeader[i][0]), (*C.uchar)(&tmpdbFollower[i][0]), (*C.uchar)(&seedLeader[0]), (*C.uchar)(&seedFollower[0]))
  581. }
  582. var tweets []lib.Tweet
  583. var currentPublisherAmount float64
  584. var collisions float64
  585. for i := 0; i < dbSize; i++ {
  586. //discard cover message
  587. if tmpdb[i][1] == 0 {
  588. continue
  589. //collision
  590. } else if -1 == strings.Index(string(tmpdb[i]), ";;") {
  591. currentPublisherAmount++
  592. currentPublisherAmount++
  593. collisions++
  594. continue
  595. } else {
  596. currentPublisherAmount++
  597. //reconstruct tweet
  598. var position int = 0
  599. var topics []string
  600. var topic string
  601. var text string
  602. for _, letter := range tmpdb[i] {
  603. if string(letter) == ";" {
  604. if topic != "" {
  605. topics = append(topics, topic)
  606. topic = ""
  607. }
  608. position++
  609. } else {
  610. if position == 0 {
  611. if string(letter) == "," {
  612. topics = append(topics, topic)
  613. topic = ""
  614. } else {
  615. //if topics are
  616. //int
  617. //topic = topic + fmt.Sprint(int(letter))
  618. //string
  619. topic = topic + string(letter)
  620. }
  621. } else if position == 1 {
  622. text = text + string(letter)
  623. }
  624. }
  625. }
  626. tweet := lib.Tweet{"", -1, topics, text, round}
  627. if text != "" {
  628. tweets = append(tweets, tweet)
  629. } else {
  630. //this is a odd(number) way collisions
  631. collisions++
  632. }
  633. }
  634. }
  635. collisionCounter = append(collisionCounter, collisions)
  636. log.Println("Collision percentage this round", collisions/dbWriteSize, "dbWriteSize", dbWriteSize)
  637. lib.NewEntries(tweets, 0)
  638. C.resetDb()
  639. //calculates the publisherAverage over the last publisherRounds rounds
  640. index := round % publisherRounds
  641. publisherHistory[index] = int(currentPublisherAmount)
  642. log.Println("currentPublisherAmount", currentPublisherAmount, "publisher percentage", currentPublisherAmount/clientsConnected)
  643. var publisherAmount int
  644. for _, num := range publisherHistory {
  645. publisherAmount += num
  646. }
  647. publisherAverage := 0
  648. if round < publisherRounds {
  649. publisherAverage = publisherAmount / round
  650. } else {
  651. publisherAverage = publisherAmount / publisherRounds
  652. }
  653. //calculates the dbWriteSize for this round
  654. dbWriteSize = math.Ceil(dbSizeFactor * float64(publisherAverage))
  655. if dbWriteSize < minDBWriteSize {
  656. dbWriteSize = minDBWriteSize
  657. }
  658. //writes dbWriteSize of current round to follower
  659. writeTo(followerConnection, intToByte(int(dbWriteSize)))
  660. lib.CleanUpdbR(round)
  661. }
  662. //opti! mb it is quicker to send updated topicLists to clients first so pirQuerys are ready
  663. func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGroup, m *sync.RWMutex) {
  664. gotClient := make([]byte, 1)
  665. gotClient[0] = 0
  666. //wait until time is up
  667. for len(phase3Channel) == 0 {
  668. if time.Since(startTimeRound) > maxTimePerRound*2 {
  669. //tells follower that this worker is done
  670. writeToWError(followerConnection, gotClient, nil, 0)
  671. wg.Done()
  672. return
  673. }
  674. time.Sleep(1 * time.Second)
  675. }
  676. for clientConnection := range phase3Channel {
  677. clientsServedPhase3[round] = clientsServedPhase3[round] + 1
  678. gotClient[0] = 1
  679. //tells follower that this worker got a clientConnection
  680. writeToWError(followerConnection, gotClient, nil, 0)
  681. //tells client current phase
  682. errorBool := writeToWError(clientConnection, phase, followerConnection, 2)
  683. if errorBool {
  684. contBool := handleClientDC(wg, followerConnection, phase3Channel)
  685. if contBool {
  686. continue
  687. } else {
  688. return
  689. }
  690. }
  691. /*
  692. possible Values
  693. 0 : new client
  694. leader expects sharedSecrets, expects pirQuery
  695. 1 : update needed
  696. leader sends topicList, performs local update of sharedSecret, expects pirQuery
  697. 2 : no update needed
  698. nothing
  699. */
  700. subPhase := make([]byte, 1)
  701. //gets the data for the current client
  702. m.RLock()
  703. var clientKeys = clientData[clientConnection.RemoteAddr()]
  704. m.RUnlock()
  705. var roundsParticipating = clientKeys.roundsParticipating
  706. //client participates for the first time
  707. if roundsParticipating == 0 {
  708. subPhase[0] = 0
  709. } else if roundsParticipating%roundsBeforeUpdate == 0 {
  710. subPhase[0] = 1
  711. } else {
  712. subPhase[0] = 2
  713. }
  714. //tells client what leader expects
  715. errorBool = writeToWError(clientConnection, subPhase, followerConnection, 2)
  716. if errorBool {
  717. contBool := handleClientDC(wg, followerConnection, phase3Channel)
  718. if contBool {
  719. continue
  720. } else {
  721. return
  722. }
  723. }
  724. //tells follower what will happen
  725. writeToWError(followerConnection, subPhase, nil, 0)
  726. //sends clients publicKey so follower knows which client is being served
  727. writeTo(followerConnection, clientKeys.PublicKey[:])
  728. //increases rounds participating for client
  729. clientKeys.roundsParticipating = roundsParticipating + 1
  730. //declaring variables here to prevent dupclicates later
  731. m.RLock()
  732. var sharedSecret [32]byte = clientData[clientConnection.RemoteAddr()].SharedSecret
  733. m.RUnlock()
  734. if subPhase[0] == 0 {
  735. errorBool := sendTopicLists(clientConnection, followerConnection, false)
  736. if errorBool {
  737. contBool := handleClientDC(wg, followerConnection, phase3Channel)
  738. if contBool {
  739. continue
  740. } else {
  741. return
  742. }
  743. }
  744. clientKeys, _, errorBool = handlePirQuery(clientKeys, clientConnection, followerConnection, int(subPhase[0]), false)
  745. if errorBool {
  746. contBool := handleClientDC(wg, followerConnection, phase3Channel)
  747. if contBool {
  748. continue
  749. } else {
  750. return
  751. }
  752. }
  753. } else if subPhase[0] == 1 {
  754. errorBool := sendTopicLists(clientConnection, followerConnection, false)
  755. if errorBool {
  756. contBool := handleClientDC(wg, followerConnection, phase3Channel)
  757. if contBool {
  758. continue
  759. } else {
  760. return
  761. }
  762. }
  763. //updates sharedSecret
  764. sharedSecret = sha256.Sum256(sharedSecret[:])
  765. clientKeys.SharedSecret = sharedSecret
  766. clientKeys, _, errorBool = handlePirQuery(clientKeys, clientConnection, followerConnection, int(subPhase[0]), false)
  767. if errorBool {
  768. contBool := handleClientDC(wg, followerConnection, phase3Channel)
  769. if contBool {
  770. continue
  771. } else {
  772. return
  773. }
  774. }
  775. }
  776. errorBool = getSendTweets(clientKeys, nil, clientConnection, followerConnection)
  777. if errorBool {
  778. contBool := handleClientDC(wg, followerConnection, phase3Channel)
  779. if contBool {
  780. continue
  781. } else {
  782. return
  783. }
  784. }
  785. wantsArchive, errorBool := readFrom(clientConnection, 1, followerConnection, 2)
  786. if errorBool {
  787. contBool := handleClientDC(wg, followerConnection, phase3Channel)
  788. if contBool {
  789. continue
  790. } else {
  791. return
  792. }
  793. }
  794. writeToWError(followerConnection, wantsArchive, nil, 0)
  795. if wantsArchive[0] == 1 && archiveTopicAmount > 0 {
  796. _, archiveQuerys, errorBool := handlePirQuery(clientKeys, clientConnection, followerConnection, -1, false)
  797. if errorBool {
  798. contBool := handleClientDC(wg, followerConnection, phase3Channel)
  799. if contBool {
  800. continue
  801. } else {
  802. return
  803. }
  804. }
  805. errorBool = getSendTweets(clientKeys, archiveQuerys, clientConnection, followerConnection)
  806. if errorBool {
  807. contBool := handleClientDC(wg, followerConnection, phase3Channel)
  808. if contBool {
  809. continue
  810. } else {
  811. return
  812. }
  813. }
  814. }
  815. //saves all changes for client
  816. m.Lock()
  817. clientData[clientConnection.RemoteAddr()] = clientKeys
  818. m.Unlock()
  819. phase1Channel <- clientConnection
  820. for {
  821. if time.Since(startTimeRound) < 2*maxTimePerRound || len(phase3Channel) > 0 {
  822. //this worker handles the next client
  823. if len(phase3Channel) > 0 {
  824. break
  825. //this worker waits for next client
  826. } else {
  827. time.Sleep(1 * time.Second)
  828. }
  829. //times up
  830. } else {
  831. //tells follower that this worker is done
  832. gotClient[0] = 0
  833. writeToWError(followerConnection, gotClient, nil, 0)
  834. wg.Done()
  835. return
  836. }
  837. }
  838. }
  839. }
  840. //returns true if there is another client
  841. func handleClientDC(wg *sync.WaitGroup, followerConnection net.Conn, channel chan net.Conn) bool {
  842. //loop that waits for new client or leaves phase1 if time is up
  843. for {
  844. if time.Since(startTimeRound) < maxTimePerRound {
  845. //this worker handles the next client
  846. if len(channel) > 0 {
  847. return true
  848. //this worker waits for next client
  849. } else {
  850. time.Sleep(1 * time.Second)
  851. }
  852. //times up
  853. } else {
  854. //tells follower that this worker is done
  855. gotClient := make([]byte, 1)
  856. gotClient[0] = 0
  857. writeTo(followerConnection, gotClient)
  858. wg.Done()
  859. return false
  860. }
  861. }
  862. }
  863. func createVirtualAddresses() []int {
  864. //array will be filled with unique random ascending values
  865. //adapted from: https://stackoverflow.com/questions/20039025/java-array-of-unique-randomly-generated-integers
  866. //+extraPositions to have a position to evaluate each received message
  867. arraySize := int(dbWriteSize) + extraPositions
  868. var maxInt int = int(math.Pow(2, 31))
  869. virtualAddresses := make([]int, arraySize)
  870. for i := 0; i < arraySize; i++ {
  871. virtualAddresses[i] = mr.Intn(maxInt)
  872. for j := 0; j < i; j++ {
  873. if virtualAddresses[i] == virtualAddresses[j] {
  874. i--
  875. break
  876. }
  877. }
  878. }
  879. sort.Ints(virtualAddresses)
  880. return virtualAddresses
  881. }
  882. func getSendVirtualAddress(pirQuery []byte, virtualAddresses []int, sharedSecret [32]byte, clientConnection, followerConnection net.Conn) bool {
  883. //xores all requested addresses into virtuallAddress
  884. virtualAddress := make([]byte, 4)
  885. for index, num := range pirQuery {
  886. if num == 1 {
  887. currentAddress := intToByte(virtualAddresses[index])
  888. for i := 0; i < 4; i++ {
  889. virtualAddress[i] = virtualAddress[i] ^ currentAddress[i]
  890. }
  891. }
  892. }
  893. //xores the sharedSecret
  894. for i := 0; i < 4; i++ {
  895. virtualAddress[i] = virtualAddress[i] ^ sharedSecret[i]
  896. }
  897. virtualAddressFollower, _ := readFrom(followerConnection, 4, nil, 0)
  898. //xores the data from follower
  899. for i := 0; i < 4; i++ {
  900. virtualAddress[i] = virtualAddress[i] ^ virtualAddressFollower[i]
  901. }
  902. errorBool := writeToWError(clientConnection, virtualAddress, followerConnection, 5)
  903. return errorBool
  904. }
  905. func getSendTweets(clientKeys clientKeys, archiveQuerys [][]byte, clientConnection, followerConnection net.Conn) bool {
  906. tmpNeededSubscriptions := neededSubscriptions
  907. if tmpNeededSubscriptions > topicAmount {
  908. tmpNeededSubscriptions = topicAmount
  909. }
  910. if archiveQuerys != nil {
  911. tmpNeededSubscriptions = len(archiveQuerys)
  912. if tmpNeededSubscriptions > archiveTopicAmount {
  913. tmpNeededSubscriptions = archiveTopicAmount
  914. }
  915. }
  916. tweets := make([][]byte, tmpNeededSubscriptions)
  917. for i := 0; i < tmpNeededSubscriptions; i++ {
  918. //gets all requested tweets
  919. if archiveQuerys == nil {
  920. tweets[i] = lib.GetTweets(clientKeys.PirQuery[i], dataLength, 0, *clientKeys.PublicKey)
  921. } else {
  922. tweets[i] = lib.GetTweets(archiveQuerys[i], dataLength, 1, *clientKeys.PublicKey)
  923. }
  924. //expand sharedSecret so it is of right length
  925. expandBy := len(tweets[i]) / 32
  926. var expandedSharedSecret []byte
  927. for i := 0; i < expandBy; i++ {
  928. expandedSharedSecret = append(expandedSharedSecret, clientKeys.SharedSecret[:]...)
  929. }
  930. //Xor's sharedSecret with all tweets
  931. lib.Xor(expandedSharedSecret[:], tweets[i])
  932. blockLength := len(tweets[i])
  933. receivedTweets, _ := readFrom(followerConnection, blockLength, nil, 0)
  934. lib.Xor(receivedTweets, tweets[i])
  935. }
  936. //sends tweets to client
  937. for i := 0; i < tmpNeededSubscriptions; i++ {
  938. tweetsLengthBytes := intToByte(len(tweets[i]))
  939. if firstTweetSend && archiveQuerys == nil {
  940. firstTweetSend = false
  941. log.Println("sending", len(tweets[0]), "bytes of data")
  942. }
  943. errorBool := writeToWError(clientConnection, tweetsLengthBytes, followerConnection, 2)
  944. if errorBool {
  945. return true
  946. }
  947. errorBool = writeToWError(clientConnection, tweets[i], followerConnection, 2)
  948. if errorBool {
  949. return true
  950. }
  951. }
  952. return false
  953. }
  954. func handlePirQuery(clientKeys clientKeys, clientConnection net.Conn, followerConnection net.Conn, subPhase int, doAuditing bool) (clientKeys, [][]byte, bool) {
  955. clientPublicKey := clientKeys.PublicKey
  956. //gets the msg length
  957. msgLengthBytes, errorBool := readFrom(clientConnection, 4, followerConnection, 5)
  958. if errorBool {
  959. return clientKeys, nil, true
  960. }
  961. msgLength := byteToInt(msgLengthBytes)
  962. //gets the leader box
  963. leaderBox, errorBool := readFrom(clientConnection, msgLength, followerConnection, 5)
  964. if errorBool {
  965. return clientKeys, nil, true
  966. }
  967. //gets the follower box
  968. followerBox, errorBool := readFrom(clientConnection, msgLength, followerConnection, 5)
  969. if errorBool {
  970. return clientKeys, nil, true
  971. }
  972. tmpNeededSubscriptions := neededSubscriptions
  973. if tmpNeededSubscriptions > topicAmount {
  974. tmpNeededSubscriptions = topicAmount
  975. }
  976. tmpTopicAmount := topicAmount
  977. if subPhase == -1 {
  978. archiveNeededSubscriptions, errorBool := readFrom(clientConnection, 4, followerConnection, 5)
  979. if errorBool {
  980. return clientKeys, nil, true
  981. }
  982. writeToWError(followerConnection, archiveNeededSubscriptions, nil, 0)
  983. tmpNeededSubscriptions = byteToInt(archiveNeededSubscriptions)
  984. tmpTopicAmount = archiveTopicAmount
  985. if tmpNeededSubscriptions > archiveTopicAmount {
  986. tmpNeededSubscriptions = archiveTopicAmount
  987. }
  988. }
  989. if doAuditing {
  990. tmpNeededSubscriptions = 1
  991. tmpTopicAmount = int(dbWriteSize)
  992. }
  993. //send length to follower
  994. writeToWError(followerConnection, msgLengthBytes, nil, 0)
  995. //send box to follower
  996. writeToWError(followerConnection, followerBox, nil, 0)
  997. var decryptNonce [24]byte
  998. copy(decryptNonce[:], leaderBox[:24])
  999. decrypted, ok := box.Open(nil, leaderBox[24:], &decryptNonce, clientPublicKey, leaderPrivateKey)
  1000. if !ok {
  1001. fmt.Println("pirQuery decryption not ok")
  1002. return clientKeys, nil, true
  1003. }
  1004. //if sharedSecret is send
  1005. if subPhase == 0 {
  1006. var tmpSharedSecret [32]byte
  1007. for index := 0; index < 32; index++ {
  1008. tmpSharedSecret[index] = decrypted[index]
  1009. }
  1010. clientKeys.SharedSecret = tmpSharedSecret
  1011. decrypted = decrypted[32:]
  1012. }
  1013. if doAuditing {
  1014. result := make([][]byte, 1)
  1015. result[0] = decrypted
  1016. return clientKeys, result, false
  1017. }
  1018. if subPhase == -1 {
  1019. }
  1020. //transforms byteArray to ints of wanted topics
  1021. pirQueryFlattened := decrypted
  1022. pirQuerys := make([][]byte, tmpNeededSubscriptions)
  1023. for i := range pirQuerys {
  1024. pirQuerys[i] = make([]byte, tmpTopicAmount)
  1025. }
  1026. for i := 0; i < tmpNeededSubscriptions; i++ {
  1027. pirQuerys[i] = pirQueryFlattened[i*tmpTopicAmount : (i+1)*tmpTopicAmount]
  1028. }
  1029. //sets the pirQuery for the client in case whe are not archiving, and not Auditing
  1030. if subPhase != -1 {
  1031. clientKeys.PirQuery = pirQuerys
  1032. }
  1033. return clientKeys, pirQuerys, false
  1034. }
  1035. func transformBytesToStringArray(topicsAsBytes []byte) []string {
  1036. var topics []string
  1037. var topic string
  1038. var position int = 0
  1039. for _, letter := range topicsAsBytes {
  1040. if string(letter) == "," {
  1041. topics[position] = topic
  1042. topic = ""
  1043. position++
  1044. } else {
  1045. topic = topic + string(letter)
  1046. }
  1047. }
  1048. return topics
  1049. }
  1050. func byteToInt(myBytes []byte) (x int) {
  1051. x = int(myBytes[3])<<24 + int(myBytes[2])<<16 + int(myBytes[1])<<8 + int(myBytes[0])
  1052. return
  1053. }
  1054. //returns true if error occured
  1055. func sendTopicLists(clientConnection, followerConnection net.Conn, setup bool) bool {
  1056. for i := 0; i < 2; i++ {
  1057. var topicList []byte
  1058. if i == 0 {
  1059. topicList, topicAmount = lib.GetTopicList(i)
  1060. } else {
  1061. topicList, archiveTopicAmount = lib.GetTopicList(i)
  1062. }
  1063. topicListLengthBytes := intToByte(len(topicList))
  1064. if !setup {
  1065. err := writeToWError(clientConnection, topicListLengthBytes, followerConnection, 5)
  1066. if err {
  1067. return true
  1068. }
  1069. err = writeToWError(clientConnection, topicList, followerConnection, 5)
  1070. if err {
  1071. return true
  1072. }
  1073. } else {
  1074. _, err := clientConnection.Write(topicListLengthBytes)
  1075. if err != nil {
  1076. return true
  1077. }
  1078. _, err = clientConnection.Write(topicList)
  1079. if err != nil {
  1080. return true
  1081. }
  1082. }
  1083. }
  1084. return false
  1085. }
  1086. //sends the array to the connection
  1087. func writeTo(connection net.Conn, array []byte) {
  1088. remainingLength := len(array)
  1089. for remainingLength > 0 {
  1090. if remainingLength >= mtu {
  1091. _, err := connection.Write(array[:mtu])
  1092. if err != nil {
  1093. panic(err)
  1094. }
  1095. array = array[mtu:]
  1096. remainingLength -= mtu
  1097. } else {
  1098. _, err := connection.Write(array)
  1099. if err != nil {
  1100. panic(err)
  1101. }
  1102. remainingLength = 0
  1103. }
  1104. }
  1105. }
  1106. func writeToWError(connection net.Conn, array []byte, followerConnection net.Conn, size int) bool {
  1107. if connection.RemoteAddr().String() == follower {
  1108. arrayWError := make([]byte, 1)
  1109. arrayWError = append(arrayWError, array[:]...)
  1110. remainingLength := len(arrayWError)
  1111. for remainingLength > 0 {
  1112. if remainingLength >= mtu {
  1113. _, err := connection.Write(arrayWError[:mtu])
  1114. if err != nil {
  1115. return handleError(connection, followerConnection, size, err)
  1116. }
  1117. arrayWError = arrayWError[mtu:]
  1118. remainingLength -= mtu
  1119. } else {
  1120. _, err := connection.Write(arrayWError)
  1121. if err != nil {
  1122. return handleError(connection, followerConnection, size, err)
  1123. }
  1124. remainingLength = 0
  1125. }
  1126. }
  1127. } else {
  1128. remainingLength := len(array)
  1129. for remainingLength > 0 {
  1130. if remainingLength >= mtu {
  1131. _, err := connection.Write(array[:mtu])
  1132. if err != nil {
  1133. return handleError(connection, followerConnection, size, err)
  1134. }
  1135. array = array[mtu:]
  1136. remainingLength -= mtu
  1137. } else {
  1138. _, err := connection.Write(array)
  1139. if err != nil {
  1140. return handleError(connection, followerConnection, size, err)
  1141. }
  1142. remainingLength = 0
  1143. }
  1144. }
  1145. }
  1146. return false
  1147. }
  1148. func handleError(connection, followerConnection net.Conn, size int, err error) bool {
  1149. if err != nil {
  1150. //lets follower know that client has disconnected unexpectedly
  1151. if connection.RemoteAddr().String() != follower {
  1152. if size > mtu {
  1153. fmt.Println("have a look here")
  1154. }
  1155. fmt.Println("handleError", err)
  1156. array := make([]byte, size)
  1157. array[0] = 1
  1158. _, err = followerConnection.Write(array)
  1159. if err != nil {
  1160. panic(err)
  1161. }
  1162. return true
  1163. } else {
  1164. panic(err)
  1165. }
  1166. }
  1167. return false
  1168. }
  1169. //reads an array which is returned and of size "size" from the connection
  1170. //returns true if error occured
  1171. func readFrom(connection net.Conn, size int, followerConnection net.Conn, sizeError int) ([]byte, bool) {
  1172. var array []byte
  1173. remainingSize := size
  1174. for remainingSize > 0 {
  1175. var err error
  1176. toAppend := make([]byte, mtu)
  1177. if remainingSize > mtu {
  1178. _, err = connection.Read(toAppend)
  1179. array = append(array, toAppend...)
  1180. remainingSize -= mtu
  1181. } else {
  1182. _, err = connection.Read(toAppend[:remainingSize])
  1183. array = append(array, toAppend[:remainingSize]...)
  1184. remainingSize = 0
  1185. }
  1186. if err != nil {
  1187. //lets follower know that client has disconnected unexpectedly
  1188. if connection.RemoteAddr().String() != follower {
  1189. fmt.Println(err)
  1190. array := make([]byte, sizeError)
  1191. array[0] = 1
  1192. _, err = followerConnection.Write(array)
  1193. if err != nil {
  1194. panic(err)
  1195. }
  1196. return nil, true
  1197. } else {
  1198. panic(err)
  1199. }
  1200. }
  1201. }
  1202. return array, false
  1203. }
  1204. func intToByte(myInt int) (retBytes []byte) {
  1205. retBytes = make([]byte, 4)
  1206. retBytes[3] = byte((myInt >> 24) & 0xff)
  1207. retBytes[2] = byte((myInt >> 16) & 0xff)
  1208. retBytes[1] = byte((myInt >> 8) & 0xff)
  1209. retBytes[0] = byte(myInt & 0xff)
  1210. return
  1211. }