leader.go 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434
  1. package main
  2. //#cgo CFLAGS: -fopenmp -O2
  3. //#cgo LDFLAGS: -lcrypto -lm -fopenmp
  4. //#include "../c/dpf.h"
  5. //#include "../c/okv.h"
  6. //#include "../c/dpf.c"
  7. //#include "../c/okv.c"
  8. import "C"
  9. import (
  10. "crypto/rand"
  11. "crypto/rsa"
  12. "crypto/sha256"
  13. "crypto/tls"
  14. "crypto/x509"
  15. "crypto/x509/pkix"
  16. "encoding/pem"
  17. "fmt"
  18. "log"
  19. "math"
  20. "math/big"
  21. mr "math/rand"
  22. "net"
  23. "os"
  24. "sort"
  25. "strings"
  26. "sync"
  27. "time"
  28. lib "2PPS/lib"
  29. "unsafe"
  30. "golang.org/x/crypto/nacl/box"
  31. )
  32. //this stores all neccessary information for each client
  33. type clientKeys struct {
  34. roundsParticipating int
  35. PublicKey *[32]byte
  36. SharedSecret [32]byte
  37. PirQuery [][]byte
  38. }
  39. var clientData = make(map[net.Addr]clientKeys)
  40. const follower string = "127.0.0.1:4442"
  41. //Maximum Transport Unit
  42. const mtu int = 1100
  43. var leaderPrivateKey *[32]byte
  44. var leaderPublicKey *[32]byte
  45. var followerPublicKey *[32]byte
  46. const maxNumberOfClients = 1000000
  47. var topicList []byte
  48. var topicAmount int
  49. var archiveTopicAmount int
  50. // every roundsBeforeUpdate the client updates his pirQuery
  51. const roundsBeforeUpdate = 2
  52. const neededSubscriptions = 5
  53. const numThreads = 12
  54. const dataLength = 256
  55. const numClients = 2000
  56. const minDBWriteSize = numClients * 0.35
  57. //riposte says 19.5
  58. const dbSizeFactor = 5
  59. const publisherGuess = 0.15
  60. var dbWriteSize float64 = numClients * dbSizeFactor * publisherGuess
  61. //this is the number of positions for auditing
  62. var extraPositions int = 2
  63. var collisionCounter []float64
  64. var clientsConnected float64
  65. var maxTimePerRound time.Duration = 1000 * time.Second
  66. //counts the number of rounds
  67. var round int
  68. var clientsServedPhase1 []int
  69. var clientsServedPhase3 []int
  70. var startPhase1 time.Time
  71. var startPhase2 time.Time
  72. var startPhase3 time.Time
  73. //only prints auditing time for one client per round
  74. var firstAuditPrint bool
  75. var auditingStart time.Time
  76. var auditingEnd time.Time
  77. var startTime time.Time
  78. var startTimeRound time.Time
  79. //channel for goroutine communication with clients
  80. var phase1Channel = make(chan net.Conn, maxNumberOfClients)
  81. var phase3Channel = make(chan net.Conn, maxNumberOfClients)
  82. //variables for calculating the dbWrite size
  83. const publisherRounds int = 10
  84. var publisherAmount float64
  85. var publisherHistory [publisherRounds]int
  86. var firstTweetSend bool
  87. func main() {
  88. f, err := os.OpenFile("evalData", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
  89. if err != nil {
  90. log.Fatalf("error opening file: %v", err)
  91. }
  92. defer f.Close()
  93. log.SetOutput(f)
  94. log.Println("roundsBeforeUpdate", roundsBeforeUpdate)
  95. log.Println("neededSubscriptions", neededSubscriptions)
  96. log.Println("dataLength", dataLength)
  97. log.Println("maxTimePerRound~", maxTimePerRound*3)
  98. log.Println("extraPositions", extraPositions)
  99. log.Println("numClients", numClients)
  100. log.Println("Archiving is done every 24h, no Clients wants Archive")
  101. //prevents race conditions for wrtiting
  102. m := &sync.RWMutex{}
  103. startTime = time.Now()
  104. clientsServedPhase1 = make([]int, 1000)
  105. clientsServedPhase3 = make([]int, 1000)
  106. generatedPublicKey, generatedPrivateKey, err := box.GenerateKey(rand.Reader)
  107. if err != nil {
  108. panic(err)
  109. }
  110. //why is this neccessary?
  111. leaderPrivateKey = generatedPrivateKey
  112. leaderPublicKey = generatedPublicKey
  113. C.initializeServer(C.int(numThreads))
  114. //calls follower for setup
  115. conf := &tls.Config{
  116. InsecureSkipVerify: true,
  117. }
  118. followerConnection, err := tls.Dial("tcp", follower, conf)
  119. if err != nil {
  120. panic(err)
  121. }
  122. followerConnection.SetDeadline(time.Time{})
  123. //receives follower publicKey
  124. var tmpFollowerPubKey [32]byte
  125. _, err = followerConnection.Read(tmpFollowerPubKey[:])
  126. if err != nil {
  127. panic(err)
  128. }
  129. followerPublicKey = &tmpFollowerPubKey
  130. //send publicKey to follower
  131. writeTo(followerConnection, leaderPublicKey[:])
  132. writeTo(followerConnection, intToByte(neededSubscriptions))
  133. writeTo(followerConnection, intToByte(int(dbWriteSize)))
  134. //goroutine for accepting new clients
  135. go func() {
  136. leaderConnectionPrivateKey, err := rsa.GenerateKey(rand.Reader, 2048)
  137. if err != nil {
  138. panic(err)
  139. }
  140. // Generate a pem block with the private key
  141. keyPem := pem.EncodeToMemory(&pem.Block{
  142. Type: "RSA PRIVATE KEY",
  143. Bytes: x509.MarshalPKCS1PrivateKey(leaderConnectionPrivateKey),
  144. })
  145. tml := x509.Certificate{
  146. // you can add any attr that you need
  147. NotBefore: time.Now(),
  148. NotAfter: time.Now().AddDate(5, 0, 0),
  149. // you have to generate a different serial number each execution
  150. SerialNumber: big.NewInt(123123),
  151. Subject: pkix.Name{
  152. CommonName: "New Name",
  153. Organization: []string{"New Org."},
  154. },
  155. BasicConstraintsValid: true,
  156. }
  157. cert, err := x509.CreateCertificate(rand.Reader, &tml, &tml, &leaderConnectionPrivateKey.PublicKey, leaderConnectionPrivateKey)
  158. if err != nil {
  159. panic(err)
  160. }
  161. // Generate a pem block with the certificate
  162. certPem := pem.EncodeToMemory(&pem.Block{
  163. Type: "CERTIFICATE",
  164. Bytes: cert,
  165. })
  166. tlsCert, err := tls.X509KeyPair(certPem, keyPem)
  167. if err != nil {
  168. panic(err)
  169. }
  170. config := &tls.Config{Certificates: []tls.Certificate{tlsCert}}
  171. //listens for clients
  172. lnClients, err := tls.Listen("tcp", ":4441", config)
  173. if err != nil {
  174. panic(err)
  175. }
  176. defer lnClients.Close()
  177. for {
  178. clientConnection, err := lnClients.Accept()
  179. if err != nil {
  180. fmt.Println("Client connection error 1", err)
  181. clientConnection.Close()
  182. break
  183. }
  184. clientConnection.SetDeadline(time.Time{})
  185. //sends topicList so client can participate in phase 3 asap
  186. errorBool := sendTopicLists(clientConnection, followerConnection, true)
  187. if errorBool {
  188. break
  189. }
  190. //send leader publicKey
  191. _, err = clientConnection.Write(leaderPublicKey[:])
  192. if err != nil {
  193. fmt.Println("Client connection error 2", err)
  194. clientConnection.Close()
  195. break
  196. }
  197. //send follower publicKey
  198. _, err = clientConnection.Write(followerPublicKey[:])
  199. if err != nil {
  200. fmt.Println("Client connection error 3", err)
  201. clientConnection.Close()
  202. break
  203. }
  204. var clientPublicKey *[32]byte
  205. var tmpClientPublicKey [32]byte
  206. //gets publicKey from client
  207. _, err = clientConnection.Read(tmpClientPublicKey[:])
  208. if err != nil {
  209. fmt.Println("Client connection error 4", err)
  210. clientConnection.Close()
  211. break
  212. }
  213. _, err = clientConnection.Write(intToByte(neededSubscriptions))
  214. if err != nil {
  215. fmt.Println("Client connection error 5", err)
  216. clientConnection.Close()
  217. break
  218. }
  219. _, err = clientConnection.Write(intToByte(int(startTime.Unix())))
  220. if err != nil {
  221. fmt.Println("Client connection error 6", err)
  222. clientConnection.Close()
  223. break
  224. }
  225. clientPublicKey = &tmpClientPublicKey
  226. //this is the key for map(client data)
  227. remoteAddress := clientConnection.RemoteAddr()
  228. //pirQuery will be added in phase 3
  229. //bs! only want to set roundsParticipating and answerAmount to 0, mb there is a better way
  230. //will work for now
  231. var emptyArray [32]byte
  232. var emptyByteArray [][]byte
  233. keys := clientKeys{0, clientPublicKey, emptyArray, emptyByteArray}
  234. m.Lock()
  235. clientData[remoteAddress] = keys
  236. m.Unlock()
  237. phase1Channel <- clientConnection
  238. clientsConnected++
  239. }
  240. }()
  241. wg := &sync.WaitGroup{}
  242. //the current phase
  243. phase := make([]byte, 1)
  244. for {
  245. startPhase1 = time.Now()
  246. startTimeRound = time.Now()
  247. firstAuditPrint = true
  248. phase[0] = 1
  249. round++
  250. fmt.Println("clientsServedPhase1", clientsServedPhase1[round-1])
  251. fmt.Println("clientsServedPhase3", clientsServedPhase3[round-1])
  252. log.Println()
  253. log.Println("Phase 1 Round", round)
  254. log.Println()
  255. bytesSaved := lib.GetBytesSaved()
  256. log.Println("bytesSaved Percentage", bytesSaved)
  257. //creates a new write Db for this round
  258. for i := 0; i < int(dbWriteSize); i++ {
  259. C.createDb(C.int(1), C.int(dataLength))
  260. }
  261. //creates a new db containing virtual addresses for auditing
  262. virtualAddresses := createVirtualAddresses()
  263. //send all virtualAddresses to follower
  264. for i := 0; i <= int(dbWriteSize); i++ {
  265. writeTo(followerConnection, intToByte(virtualAddresses[i]))
  266. }
  267. //moves all clients to phase1
  268. if len(phase3Channel) > 0 {
  269. for client := range phase3Channel {
  270. phase1Channel <- client
  271. if len(phase3Channel) == 0 {
  272. break
  273. }
  274. }
  275. }
  276. for id := 0; id < numThreads; id++ {
  277. wg.Add(1)
  278. followerConnection, err := tls.Dial("tcp", follower, conf)
  279. if err != nil {
  280. panic(err)
  281. }
  282. followerConnection.SetDeadline(time.Time{})
  283. go phase1(id, phase, followerConnection, wg, m, virtualAddresses)
  284. }
  285. wg.Wait()
  286. log.Println("fullDurationPhase1", time.Since(startPhase1).Seconds())
  287. log.Println("fullAuditingDuration~", auditingEnd.Sub(auditingStart).Seconds()*clientsConnected)
  288. log.Println("auditingPercentage", (auditingEnd.Sub(auditingStart).Seconds()*clientsConnected)/(time.Since(startPhase1).Seconds()))
  289. //Phase 2
  290. startPhase2 = time.Now()
  291. followerConnection, err := tls.Dial("tcp", follower, conf)
  292. if err != nil {
  293. panic(err)
  294. }
  295. followerConnection.SetDeadline(time.Time{})
  296. phase2(followerConnection)
  297. log.Println("fullDurationPhase2", time.Since(startPhase2).Seconds())
  298. //Phase 3
  299. //moves all clients to phase3
  300. if len(phase1Channel) > 0 {
  301. for client := range phase1Channel {
  302. phase3Channel <- client
  303. if len(phase1Channel) == 0 {
  304. break
  305. }
  306. }
  307. }
  308. startPhase3 = time.Now()
  309. //no tweets -> continue to phase 1 and mb get tweets
  310. topicList, topicAmount = lib.GetTopicList(0)
  311. if len(topicList) == 0 {
  312. continue
  313. }
  314. firstTweetSend = true
  315. phase[0] = 3
  316. startTimeRound = time.Now()
  317. for id := 0; id < numThreads; id++ {
  318. wg.Add(1)
  319. followerConnection, err := tls.Dial("tcp", follower, conf)
  320. if err != nil {
  321. panic(err)
  322. }
  323. followerConnection.SetDeadline(time.Time{})
  324. go phase3(id, phase, followerConnection, wg, m)
  325. }
  326. wg.Wait()
  327. log.Println("fullDurationPhase3", time.Since(startPhase3).Seconds())
  328. lib.CleanUpdbR(round)
  329. }
  330. }
  331. func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGroup, m *sync.RWMutex, virtualAddresses []int) {
  332. roundAsBytes := intToByte(round)
  333. gotClient := make([]byte, 1)
  334. gotClient[0] = 0
  335. //wait until time is up
  336. for len(phase1Channel) == 0 {
  337. if time.Since(startTimeRound) > maxTimePerRound {
  338. //tells follower that this worker is done
  339. writeTo(followerConnection, gotClient)
  340. wg.Done()
  341. return
  342. }
  343. time.Sleep(1 * time.Second)
  344. }
  345. for clientConnection := range phase1Channel {
  346. clientsServedPhase1[round] = clientsServedPhase1[round] + 1
  347. gotClient[0] = 1
  348. //tells follower that this worker got a clientConnection
  349. writeTo(followerConnection, gotClient)
  350. //sends clients publicKey to follower
  351. m.RLock()
  352. clientPublicKey := clientData[clientConnection.RemoteAddr()].PublicKey
  353. m.RUnlock()
  354. writeTo(followerConnection, clientPublicKey[:])
  355. //setup the worker-specific db
  356. dbSize := int(C.dbSize)
  357. db := make([][]byte, dbSize)
  358. for i := 0; i < dbSize; i++ {
  359. db[i] = make([]byte, int(C.db[i].dataSize))
  360. }
  361. //tells client that phase 1 has begun
  362. errorBool := writeToWError(clientConnection, phase, followerConnection, 5)
  363. if errorBool {
  364. contBool := handleClientDC(wg, followerConnection, phase1Channel)
  365. if contBool {
  366. continue
  367. } else {
  368. return
  369. }
  370. }
  371. //tells client current dbWriteSize
  372. errorBool = writeToWError(clientConnection, intToByte(int(dbWriteSize)), followerConnection, 5)
  373. if errorBool {
  374. contBool := handleClientDC(wg, followerConnection, phase1Channel)
  375. if contBool {
  376. continue
  377. } else {
  378. return
  379. }
  380. }
  381. //tells client current round
  382. errorBool = writeToWError(clientConnection, roundAsBytes, followerConnection, 5)
  383. if errorBool {
  384. contBool := handleClientDC(wg, followerConnection, phase1Channel)
  385. if contBool {
  386. continue
  387. } else {
  388. return
  389. }
  390. }
  391. //begin auditing
  392. if id == 0 && firstAuditPrint {
  393. auditingStart = time.Now()
  394. }
  395. m.RLock()
  396. var clientKeys = clientData[clientConnection.RemoteAddr()]
  397. m.RUnlock()
  398. clientKeys, pirQuery, errorBool := handlePirQuery(clientKeys, clientConnection, followerConnection, 0, true)
  399. if errorBool {
  400. contBool := handleClientDC(wg, followerConnection, phase1Channel)
  401. if contBool {
  402. continue
  403. } else {
  404. return
  405. }
  406. }
  407. errorBool = getSendVirtualAddress(pirQuery[0], virtualAddresses, clientKeys.SharedSecret, clientConnection, followerConnection)
  408. if errorBool {
  409. contBool := handleClientDC(wg, followerConnection, phase1Channel)
  410. if contBool {
  411. continue
  412. } else {
  413. return
  414. }
  415. }
  416. if id == 0 && firstAuditPrint {
  417. firstAuditPrint = false
  418. auditingEnd = time.Now()
  419. log.Println("Auditing duration", time.Since(auditingStart).Seconds(), "numVirtualAddresses", len(virtualAddresses))
  420. }
  421. m.Lock()
  422. clientData[clientConnection.RemoteAddr()] = clientKeys
  423. m.Unlock()
  424. //accept dpfQuery from client
  425. dpfLengthBytes, errorBool := readFrom(clientConnection, 4, followerConnection, 5)
  426. if errorBool {
  427. contBool := handleClientDC(wg, followerConnection, phase1Channel)
  428. if contBool {
  429. continue
  430. } else {
  431. return
  432. }
  433. }
  434. dpfLength := byteToInt(dpfLengthBytes)
  435. dpfQueryAEncrypted, errorBool := readFrom(clientConnection, dpfLength, followerConnection, 5)
  436. if errorBool {
  437. contBool := handleClientDC(wg, followerConnection, phase1Channel)
  438. if contBool {
  439. continue
  440. } else {
  441. return
  442. }
  443. }
  444. dpfQueryBEncrypted, errorBool := readFrom(clientConnection, dpfLength, followerConnection, 5)
  445. if errorBool {
  446. contBool := handleClientDC(wg, followerConnection, phase1Channel)
  447. if contBool {
  448. continue
  449. } else {
  450. return
  451. }
  452. }
  453. writeToWError(followerConnection, dpfLengthBytes, nil, 0)
  454. writeToWError(followerConnection, dpfQueryBEncrypted, nil, 0)
  455. //decrypt dpfQueryA for sorting into db
  456. var decryptNonce [24]byte
  457. copy(decryptNonce[:], dpfQueryAEncrypted[:24])
  458. dpfQueryA, ok := box.Open(nil, dpfQueryAEncrypted[24:], &decryptNonce, clientPublicKey, leaderPrivateKey)
  459. if !ok {
  460. panic("dpfQueryA decryption not ok")
  461. }
  462. ds := int(C.db[0].dataSize)
  463. dataShareLeader := make([]byte, ds)
  464. pos := C.getUint128_t(C.int(virtualAddresses[int(dbWriteSize)]))
  465. C.evalDPF(C.ctx[id], (*C.uchar)(&dpfQueryA[0]), pos, C.int(ds), (*C.uchar)(&dataShareLeader[0]))
  466. dataShareFollower, _ := readFrom(followerConnection, ds, nil, 0)
  467. writeToWError(followerConnection, dataShareLeader, nil, 0)
  468. auditXOR := make([]byte, ds)
  469. passedAudit := true
  470. for i := 0; i < ds; i++ {
  471. auditXOR[i] = dataShareLeader[i] ^ dataShareFollower[i]
  472. //client tried to write to a position that is not a virtuallAddress
  473. if auditXOR[i] != 0 {
  474. clientConnection.Close()
  475. passedAudit = false
  476. }
  477. }
  478. if passedAudit {
  479. //run dpf, xor into local db
  480. for i := 0; i < dbSize; i++ {
  481. ds := int(C.db[i].dataSize)
  482. dataShare := make([]byte, ds)
  483. pos := C.getUint128_t(C.int(virtualAddresses[i]))
  484. C.evalDPF(C.ctx[id], (*C.uchar)(&dpfQueryA[0]), pos, C.int(ds), (*C.uchar)(&dataShare[0]))
  485. for j := 0; j < ds; j++ {
  486. db[i][j] = db[i][j] ^ dataShare[j]
  487. }
  488. }
  489. //xor the worker's DB into the main DB
  490. for i := 0; i < dbSize; i++ {
  491. m.Lock()
  492. C.xorIn(C.int(i), (*C.uchar)(&db[i][0]))
  493. m.Unlock()
  494. }
  495. phase3Channel <- clientConnection
  496. }
  497. //loop that waits for new client or leaves phase1 if time is up
  498. //todo! remove && len(phase1Channel) > 0
  499. for {
  500. if time.Since(startTimeRound) < maxTimePerRound && len(phase1Channel) > 0 {
  501. //this worker handles the next client
  502. if len(phase1Channel) > 0 {
  503. break
  504. //this worker waits for next client
  505. } else {
  506. time.Sleep(1 * time.Second)
  507. }
  508. //times up
  509. } else {
  510. //tells follower that this worker is done
  511. gotClient[0] = 0
  512. writeTo(followerConnection, gotClient)
  513. wg.Done()
  514. return
  515. }
  516. }
  517. }
  518. }
  519. func phase2(followerConnection net.Conn) {
  520. //gets current seed
  521. seedLeader := make([]byte, 16)
  522. C.readSeed((*C.uchar)(&seedLeader[0]))
  523. //get data
  524. dbSize := int(C.dbSize)
  525. tmpdbLeader := make([][]byte, dbSize)
  526. for i := range tmpdbLeader {
  527. tmpdbLeader[i] = make([]byte, dataLength)
  528. }
  529. for i := 0; i < dbSize; i++ {
  530. C.readData(C.int(i), (*C.uchar)(&tmpdbLeader[i][0]))
  531. }
  532. //writes seed to follower
  533. writeTo(followerConnection, seedLeader)
  534. //write data to follower
  535. //this is surely inefficent
  536. for i := 0; i < dbSize; i++ {
  537. writeTo(followerConnection, tmpdbLeader[i])
  538. }
  539. //receive seed from follower
  540. seedFollower, _ := readFrom(followerConnection, 16, nil, 0)
  541. //receive data from follower
  542. tmpdbFollower := make([][]byte, dbSize)
  543. for i := range tmpdbFollower {
  544. tmpdbFollower[i] = make([]byte, dataLength)
  545. }
  546. for i := 0; i < dbSize; i++ {
  547. tmpdbFollower[i], _ = readFrom(followerConnection, dataLength, nil, 0)
  548. }
  549. //put together the db
  550. tmpdb := make([][]byte, dbSize)
  551. for i := range tmpdb {
  552. tmpdb[i] = make([]byte, dataLength)
  553. }
  554. //get own Ciphers
  555. ciphersLeader := make([]*C.uchar, dbSize)
  556. for i := 0; i < dbSize; i++ {
  557. ciphersLeader[i] = (*C.uchar)(C.malloc(16))
  558. }
  559. for i := 0; i < dbSize; i++ {
  560. C.getCipher(1, C.int(i), ciphersLeader[i])
  561. }
  562. //send own Ciphers to follower
  563. for i := 0; i < dbSize; i++ {
  564. writeTo(followerConnection, C.GoBytes(unsafe.Pointer(ciphersLeader[i]), 16))
  565. }
  566. //receive ciphers from follower
  567. ciphersFollower := make([]byte, dbSize*16)
  568. for i := 0; i < dbSize; i++ {
  569. _, err := followerConnection.Read(ciphersFollower[i*16:])
  570. if err != nil {
  571. panic(err)
  572. }
  573. }
  574. //put in ciphers from follower
  575. for i := 0; i < dbSize; i++ {
  576. C.putCipher(1, C.int(i), (*C.uchar)(&ciphersFollower[i*16]))
  577. }
  578. //decrypt each row
  579. for i := 0; i < dbSize; i++ {
  580. C.decryptRow(C.int(i), (*C.uchar)(&tmpdb[i][0]), (*C.uchar)(&tmpdbLeader[i][0]), (*C.uchar)(&tmpdbFollower[i][0]), (*C.uchar)(&seedLeader[0]), (*C.uchar)(&seedFollower[0]))
  581. }
  582. var tweets []lib.Tweet
  583. var currentPublisherAmount float64
  584. var collisions float64
  585. for i := 0; i < dbSize; i++ {
  586. //discard cover message
  587. if tmpdb[i][1] == 0 {
  588. continue
  589. //collision
  590. } else if -1 == strings.Index(string(tmpdb[i]), ";;") {
  591. currentPublisherAmount++
  592. currentPublisherAmount++
  593. collisions++
  594. continue
  595. } else {
  596. currentPublisherAmount++
  597. //reconstruct tweet
  598. var position int = 0
  599. var topics []string
  600. var topic string
  601. var text string
  602. for _, letter := range tmpdb[i] {
  603. if string(letter) == ";" {
  604. if topic != "" {
  605. topics = append(topics, topic)
  606. topic = ""
  607. }
  608. position++
  609. } else {
  610. if position == 0 {
  611. if string(letter) == "," {
  612. topics = append(topics, topic)
  613. topic = ""
  614. } else {
  615. //if topics are
  616. //int
  617. //topic = topic + fmt.Sprint(int(letter))
  618. //string
  619. topic = topic + string(letter)
  620. }
  621. } else if position == 1 {
  622. text = text + string(letter)
  623. }
  624. }
  625. }
  626. tweet := lib.Tweet{"", -1, topics, text, round}
  627. if text != "" {
  628. tweets = append(tweets, tweet)
  629. } else {
  630. //this is a odd(number) way collisions
  631. collisions++
  632. }
  633. }
  634. }
  635. collisionCounter = append(collisionCounter, collisions)
  636. log.Println("Collision percentage this round", collisions/dbWriteSize, "dbWriteSize", dbWriteSize)
  637. lib.NewEntries(tweets, 0)
  638. C.resetDb()
  639. //calculates the publisherAverage over the last publisherRounds rounds
  640. index := round % publisherRounds
  641. publisherHistory[index] = int(currentPublisherAmount)
  642. log.Println("currentPublisherAmount", currentPublisherAmount, "publisher percentage", currentPublisherAmount/clientsConnected)
  643. var publisherAmount int
  644. for _, num := range publisherHistory {
  645. publisherAmount += num
  646. }
  647. publisherAverage := 0
  648. if round < publisherRounds {
  649. publisherAverage = publisherAmount / round
  650. } else {
  651. publisherAverage = publisherAmount / publisherRounds
  652. }
  653. //calculates the dbWriteSize for this round
  654. dbWriteSize = math.Ceil(dbSizeFactor * float64(publisherAverage))
  655. if dbWriteSize < minDBWriteSize {
  656. dbWriteSize = minDBWriteSize
  657. }
  658. //writes dbWriteSize of current round to follower
  659. writeTo(followerConnection, intToByte(int(dbWriteSize)))
  660. lib.CleanUpdbR(round)
  661. }
  662. //opti! mb it is quicker to send updated topicLists to clients first so pirQuerys are ready
  663. func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGroup, m *sync.RWMutex) {
  664. gotClient := make([]byte, 1)
  665. gotClient[0] = 0
  666. //wait until time is up
  667. for len(phase3Channel) == 0 {
  668. if time.Since(startTimeRound) > maxTimePerRound*2 {
  669. //tells follower that this worker is done
  670. writeToWError(followerConnection, gotClient, nil, 0)
  671. wg.Done()
  672. return
  673. }
  674. time.Sleep(1 * time.Second)
  675. }
  676. for clientConnection := range phase3Channel {
  677. clientsServedPhase3[round] = clientsServedPhase3[round] + 1
  678. if clientsServedPhase3[round]%1000 == 0 {
  679. fmt.Println("clientsServedPhase3", clientsServedPhase3[round])
  680. fmt.Println("timeTaken", time.Since(startPhase3).Seconds())
  681. }
  682. gotClient[0] = 1
  683. //tells follower that this worker got a clientConnection
  684. writeToWError(followerConnection, gotClient, nil, 0)
  685. //tells client current phase
  686. errorBool := writeToWError(clientConnection, phase, followerConnection, 2)
  687. if errorBool {
  688. contBool := handleClientDC(wg, followerConnection, phase3Channel)
  689. if contBool {
  690. continue
  691. } else {
  692. return
  693. }
  694. }
  695. /*
  696. possible Values
  697. 0 : new client
  698. leader expects sharedSecrets, expects pirQuery
  699. 1 : update needed
  700. leader sends topicList, performs local update of sharedSecret, expects pirQuery
  701. 2 : no update needed
  702. nothing
  703. */
  704. subPhase := make([]byte, 1)
  705. //gets the data for the current client
  706. m.RLock()
  707. var clientKeys = clientData[clientConnection.RemoteAddr()]
  708. m.RUnlock()
  709. var roundsParticipating = clientKeys.roundsParticipating
  710. //client participates for the first time
  711. if roundsParticipating == 0 {
  712. subPhase[0] = 0
  713. } else if roundsParticipating%roundsBeforeUpdate == 0 {
  714. subPhase[0] = 1
  715. } else {
  716. subPhase[0] = 2
  717. }
  718. //tells client what leader expects
  719. errorBool = writeToWError(clientConnection, subPhase, followerConnection, 2)
  720. if errorBool {
  721. contBool := handleClientDC(wg, followerConnection, phase3Channel)
  722. if contBool {
  723. continue
  724. } else {
  725. return
  726. }
  727. }
  728. //tells follower what will happen
  729. writeToWError(followerConnection, subPhase, nil, 0)
  730. //sends clients publicKey so follower knows which client is being served
  731. writeTo(followerConnection, clientKeys.PublicKey[:])
  732. //increases rounds participating for client
  733. clientKeys.roundsParticipating = roundsParticipating + 1
  734. //declaring variables here to prevent dupclicates later
  735. m.RLock()
  736. var sharedSecret [32]byte = clientData[clientConnection.RemoteAddr()].SharedSecret
  737. m.RUnlock()
  738. if subPhase[0] == 0 {
  739. errorBool := sendTopicLists(clientConnection, followerConnection, false)
  740. if errorBool {
  741. contBool := handleClientDC(wg, followerConnection, phase3Channel)
  742. if contBool {
  743. continue
  744. } else {
  745. return
  746. }
  747. }
  748. clientKeys, _, errorBool = handlePirQuery(clientKeys, clientConnection, followerConnection, int(subPhase[0]), false)
  749. if errorBool {
  750. contBool := handleClientDC(wg, followerConnection, phase3Channel)
  751. if contBool {
  752. continue
  753. } else {
  754. return
  755. }
  756. }
  757. } else if subPhase[0] == 1 {
  758. errorBool := sendTopicLists(clientConnection, followerConnection, false)
  759. if errorBool {
  760. contBool := handleClientDC(wg, followerConnection, phase3Channel)
  761. if contBool {
  762. continue
  763. } else {
  764. return
  765. }
  766. }
  767. //updates sharedSecret
  768. sharedSecret = sha256.Sum256(sharedSecret[:])
  769. clientKeys.SharedSecret = sharedSecret
  770. clientKeys, _, errorBool = handlePirQuery(clientKeys, clientConnection, followerConnection, int(subPhase[0]), false)
  771. if errorBool {
  772. contBool := handleClientDC(wg, followerConnection, phase3Channel)
  773. if contBool {
  774. continue
  775. } else {
  776. return
  777. }
  778. }
  779. }
  780. errorBool = getSendTweets(clientKeys, nil, clientConnection, followerConnection)
  781. if errorBool {
  782. contBool := handleClientDC(wg, followerConnection, phase3Channel)
  783. if contBool {
  784. continue
  785. } else {
  786. return
  787. }
  788. }
  789. wantsArchive, errorBool := readFrom(clientConnection, 1, followerConnection, 2)
  790. if errorBool {
  791. contBool := handleClientDC(wg, followerConnection, phase3Channel)
  792. if contBool {
  793. continue
  794. } else {
  795. return
  796. }
  797. }
  798. writeToWError(followerConnection, wantsArchive, nil, 0)
  799. if wantsArchive[0] == 1 && archiveTopicAmount > 0 {
  800. _, archiveQuerys, errorBool := handlePirQuery(clientKeys, clientConnection, followerConnection, -1, false)
  801. if errorBool {
  802. contBool := handleClientDC(wg, followerConnection, phase3Channel)
  803. if contBool {
  804. continue
  805. } else {
  806. return
  807. }
  808. }
  809. errorBool = getSendTweets(clientKeys, archiveQuerys, clientConnection, followerConnection)
  810. if errorBool {
  811. contBool := handleClientDC(wg, followerConnection, phase3Channel)
  812. if contBool {
  813. continue
  814. } else {
  815. return
  816. }
  817. }
  818. }
  819. //saves all changes for client
  820. m.Lock()
  821. clientData[clientConnection.RemoteAddr()] = clientKeys
  822. m.Unlock()
  823. phase1Channel <- clientConnection
  824. //todo! remove && len(phase3Channel) > 0
  825. for {
  826. if time.Since(startTimeRound) < 2*maxTimePerRound && len(phase3Channel) > 0 {
  827. //this worker handles the next client
  828. if len(phase3Channel) > 0 {
  829. break
  830. //this worker waits for next client
  831. } else {
  832. time.Sleep(1 * time.Second)
  833. }
  834. //times up
  835. } else {
  836. //tells follower that this worker is done
  837. gotClient[0] = 0
  838. writeToWError(followerConnection, gotClient, nil, 0)
  839. wg.Done()
  840. return
  841. }
  842. }
  843. }
  844. }
  845. //returns true if there is another client
  846. func handleClientDC(wg *sync.WaitGroup, followerConnection net.Conn, channel chan net.Conn) bool {
  847. //loop that waits for new client or leaves phase1 if time is up
  848. for {
  849. if time.Since(startTimeRound) < maxTimePerRound {
  850. //this worker handles the next client
  851. if len(channel) > 0 {
  852. return true
  853. //this worker waits for next client
  854. } else {
  855. time.Sleep(1 * time.Second)
  856. }
  857. //times up
  858. } else {
  859. //tells follower that this worker is done
  860. gotClient := make([]byte, 1)
  861. gotClient[0] = 0
  862. writeTo(followerConnection, gotClient)
  863. wg.Done()
  864. return false
  865. }
  866. }
  867. }
  868. func createVirtualAddresses() []int {
  869. //array will be filled with unique random ascending values
  870. //adapted from: https://stackoverflow.com/questions/20039025/java-array-of-unique-randomly-generated-integers
  871. //+extraPositions to have a position to evaluate each received message
  872. arraySize := int(dbWriteSize) + extraPositions
  873. var maxInt int = int(math.Pow(2, 31))
  874. virtualAddresses := make([]int, arraySize)
  875. for i := 0; i < arraySize; i++ {
  876. virtualAddresses[i] = mr.Intn(maxInt)
  877. for j := 0; j < i; j++ {
  878. if virtualAddresses[i] == virtualAddresses[j] {
  879. i--
  880. break
  881. }
  882. }
  883. }
  884. sort.Ints(virtualAddresses)
  885. return virtualAddresses
  886. }
  887. func getSendVirtualAddress(pirQuery []byte, virtualAddresses []int, sharedSecret [32]byte, clientConnection, followerConnection net.Conn) bool {
  888. //xores all requested addresses into virtuallAddress
  889. virtualAddress := make([]byte, 4)
  890. for index, num := range pirQuery {
  891. if num == 1 {
  892. currentAddress := intToByte(virtualAddresses[index])
  893. for i := 0; i < 4; i++ {
  894. virtualAddress[i] = virtualAddress[i] ^ currentAddress[i]
  895. }
  896. }
  897. }
  898. //xores the sharedSecret
  899. for i := 0; i < 4; i++ {
  900. virtualAddress[i] = virtualAddress[i] ^ sharedSecret[i]
  901. }
  902. virtualAddressFollower, _ := readFrom(followerConnection, 4, nil, 0)
  903. //xores the data from follower
  904. for i := 0; i < 4; i++ {
  905. virtualAddress[i] = virtualAddress[i] ^ virtualAddressFollower[i]
  906. }
  907. errorBool := writeToWError(clientConnection, virtualAddress, followerConnection, 5)
  908. return errorBool
  909. }
  910. func getSendTweets(clientKeys clientKeys, archiveQuerys [][]byte, clientConnection, followerConnection net.Conn) bool {
  911. tmpNeededSubscriptions := neededSubscriptions
  912. if tmpNeededSubscriptions > topicAmount {
  913. tmpNeededSubscriptions = topicAmount
  914. }
  915. if archiveQuerys != nil {
  916. tmpNeededSubscriptions = len(archiveQuerys)
  917. if tmpNeededSubscriptions > archiveTopicAmount {
  918. tmpNeededSubscriptions = archiveTopicAmount
  919. }
  920. }
  921. tweets := make([][]byte, tmpNeededSubscriptions)
  922. for i := 0; i < tmpNeededSubscriptions; i++ {
  923. //gets all requested tweets
  924. if archiveQuerys == nil {
  925. tweets[i] = lib.GetTweets(clientKeys.PirQuery[i], dataLength, 0, *clientKeys.PublicKey)
  926. } else {
  927. tweets[i] = lib.GetTweets(archiveQuerys[i], dataLength, 1, *clientKeys.PublicKey)
  928. }
  929. //expand sharedSecret so it is of right length
  930. expandBy := len(tweets[i]) / 32
  931. var expandedSharedSecret []byte
  932. for i := 0; i < expandBy; i++ {
  933. expandedSharedSecret = append(expandedSharedSecret, clientKeys.SharedSecret[:]...)
  934. }
  935. //Xor's sharedSecret with all tweets
  936. lib.Xor(expandedSharedSecret[:], tweets[i])
  937. blockLength := len(tweets[i])
  938. receivedTweets, _ := readFrom(followerConnection, blockLength, nil, 0)
  939. lib.Xor(receivedTweets, tweets[i])
  940. }
  941. //sends tweets to client
  942. for i := 0; i < tmpNeededSubscriptions; i++ {
  943. tweetsLengthBytes := intToByte(len(tweets[i]))
  944. if firstTweetSend && archiveQuerys == nil {
  945. firstTweetSend = false
  946. log.Println("sending", len(tweets[0]), "bytes of data")
  947. }
  948. errorBool := writeToWError(clientConnection, tweetsLengthBytes, followerConnection, 2)
  949. if errorBool {
  950. return true
  951. }
  952. errorBool = writeToWError(clientConnection, tweets[i], followerConnection, 2)
  953. if errorBool {
  954. return true
  955. }
  956. }
  957. return false
  958. }
  959. func handlePirQuery(clientKeys clientKeys, clientConnection net.Conn, followerConnection net.Conn, subPhase int, doAuditing bool) (clientKeys, [][]byte, bool) {
  960. clientPublicKey := clientKeys.PublicKey
  961. //gets the msg length
  962. msgLengthBytes, errorBool := readFrom(clientConnection, 4, followerConnection, 5)
  963. if errorBool {
  964. return clientKeys, nil, true
  965. }
  966. msgLength := byteToInt(msgLengthBytes)
  967. //gets the leader box
  968. leaderBox, errorBool := readFrom(clientConnection, msgLength, followerConnection, 5)
  969. if errorBool {
  970. return clientKeys, nil, true
  971. }
  972. //gets the follower box
  973. followerBox, errorBool := readFrom(clientConnection, msgLength, followerConnection, 5)
  974. if errorBool {
  975. return clientKeys, nil, true
  976. }
  977. tmpNeededSubscriptions := neededSubscriptions
  978. if tmpNeededSubscriptions > topicAmount {
  979. tmpNeededSubscriptions = topicAmount
  980. }
  981. tmpTopicAmount := topicAmount
  982. if subPhase == -1 {
  983. archiveNeededSubscriptions, errorBool := readFrom(clientConnection, 4, followerConnection, 5)
  984. if errorBool {
  985. return clientKeys, nil, true
  986. }
  987. writeToWError(followerConnection, archiveNeededSubscriptions, nil, 0)
  988. tmpNeededSubscriptions = byteToInt(archiveNeededSubscriptions)
  989. tmpTopicAmount = archiveTopicAmount
  990. if tmpNeededSubscriptions > archiveTopicAmount {
  991. tmpNeededSubscriptions = archiveTopicAmount
  992. }
  993. }
  994. if doAuditing {
  995. tmpNeededSubscriptions = 1
  996. tmpTopicAmount = int(dbWriteSize)
  997. }
  998. //send length to follower
  999. writeToWError(followerConnection, msgLengthBytes, nil, 0)
  1000. //send box to follower
  1001. writeToWError(followerConnection, followerBox, nil, 0)
  1002. var decryptNonce [24]byte
  1003. copy(decryptNonce[:], leaderBox[:24])
  1004. decrypted, ok := box.Open(nil, leaderBox[24:], &decryptNonce, clientPublicKey, leaderPrivateKey)
  1005. if !ok {
  1006. fmt.Println("pirQuery decryption not ok")
  1007. return clientKeys, nil, true
  1008. }
  1009. //if sharedSecret is send
  1010. if subPhase == 0 {
  1011. var tmpSharedSecret [32]byte
  1012. for index := 0; index < 32; index++ {
  1013. tmpSharedSecret[index] = decrypted[index]
  1014. }
  1015. clientKeys.SharedSecret = tmpSharedSecret
  1016. decrypted = decrypted[32:]
  1017. }
  1018. if doAuditing {
  1019. result := make([][]byte, 1)
  1020. result[0] = decrypted
  1021. return clientKeys, result, false
  1022. }
  1023. if subPhase == -1 {
  1024. }
  1025. //transforms byteArray to ints of wanted topics
  1026. pirQueryFlattened := decrypted
  1027. pirQuerys := make([][]byte, tmpNeededSubscriptions)
  1028. for i := range pirQuerys {
  1029. pirQuerys[i] = make([]byte, tmpTopicAmount)
  1030. }
  1031. for i := 0; i < tmpNeededSubscriptions; i++ {
  1032. pirQuerys[i] = pirQueryFlattened[i*tmpTopicAmount : (i+1)*tmpTopicAmount]
  1033. }
  1034. //sets the pirQuery for the client in case whe are not archiving, and not Auditing
  1035. if subPhase != -1 {
  1036. clientKeys.PirQuery = pirQuerys
  1037. }
  1038. return clientKeys, pirQuerys, false
  1039. }
  1040. func transformBytesToStringArray(topicsAsBytes []byte) []string {
  1041. var topics []string
  1042. var topic string
  1043. var position int = 0
  1044. for _, letter := range topicsAsBytes {
  1045. if string(letter) == "," {
  1046. topics[position] = topic
  1047. topic = ""
  1048. position++
  1049. } else {
  1050. topic = topic + string(letter)
  1051. }
  1052. }
  1053. return topics
  1054. }
  1055. func byteToInt(myBytes []byte) (x int) {
  1056. x = int(myBytes[3])<<24 + int(myBytes[2])<<16 + int(myBytes[1])<<8 + int(myBytes[0])
  1057. return
  1058. }
  1059. //returns true if error occured
  1060. func sendTopicLists(clientConnection, followerConnection net.Conn, setup bool) bool {
  1061. for i := 0; i < 2; i++ {
  1062. var topicList []byte
  1063. if i == 0 {
  1064. topicList, topicAmount = lib.GetTopicList(i)
  1065. } else {
  1066. topicList, archiveTopicAmount = lib.GetTopicList(i)
  1067. }
  1068. topicListLengthBytes := intToByte(len(topicList))
  1069. if !setup {
  1070. err := writeToWError(clientConnection, topicListLengthBytes, followerConnection, 5)
  1071. if err {
  1072. return true
  1073. }
  1074. err = writeToWError(clientConnection, topicList, followerConnection, 5)
  1075. if err {
  1076. return true
  1077. }
  1078. } else {
  1079. _, err := clientConnection.Write(topicListLengthBytes)
  1080. if err != nil {
  1081. return true
  1082. }
  1083. _, err = clientConnection.Write(topicList)
  1084. if err != nil {
  1085. return true
  1086. }
  1087. }
  1088. }
  1089. return false
  1090. }
  1091. //sends the array to the connection
  1092. func writeTo(connection net.Conn, array []byte) {
  1093. remainingLength := len(array)
  1094. for remainingLength > 0 {
  1095. if remainingLength >= mtu {
  1096. _, err := connection.Write(array[:mtu])
  1097. if err != nil {
  1098. panic(err)
  1099. }
  1100. array = array[mtu:]
  1101. remainingLength -= mtu
  1102. } else {
  1103. _, err := connection.Write(array)
  1104. if err != nil {
  1105. panic(err)
  1106. }
  1107. remainingLength = 0
  1108. }
  1109. }
  1110. }
  1111. func writeToWError(connection net.Conn, array []byte, followerConnection net.Conn, size int) bool {
  1112. if connection.RemoteAddr().String() == follower {
  1113. arrayWError := make([]byte, 1)
  1114. arrayWError = append(arrayWError, array[:]...)
  1115. remainingLength := len(arrayWError)
  1116. for remainingLength > 0 {
  1117. if remainingLength >= mtu {
  1118. _, err := connection.Write(arrayWError[:mtu])
  1119. if err != nil {
  1120. return handleError(connection, followerConnection, size, err)
  1121. }
  1122. arrayWError = arrayWError[mtu:]
  1123. remainingLength -= mtu
  1124. } else {
  1125. _, err := connection.Write(arrayWError)
  1126. if err != nil {
  1127. return handleError(connection, followerConnection, size, err)
  1128. }
  1129. remainingLength = 0
  1130. }
  1131. }
  1132. } else {
  1133. remainingLength := len(array)
  1134. for remainingLength > 0 {
  1135. if remainingLength >= mtu {
  1136. _, err := connection.Write(array[:mtu])
  1137. if err != nil {
  1138. return handleError(connection, followerConnection, size, err)
  1139. }
  1140. array = array[mtu:]
  1141. remainingLength -= mtu
  1142. } else {
  1143. _, err := connection.Write(array)
  1144. if err != nil {
  1145. return handleError(connection, followerConnection, size, err)
  1146. }
  1147. remainingLength = 0
  1148. }
  1149. }
  1150. }
  1151. return false
  1152. }
  1153. func handleError(connection, followerConnection net.Conn, size int, err error) bool {
  1154. if err != nil {
  1155. //lets follower know that client has disconnected unexpectedly
  1156. if connection.RemoteAddr().String() != follower {
  1157. if size > mtu {
  1158. fmt.Println("have a look here")
  1159. }
  1160. fmt.Println("handleError", err)
  1161. array := make([]byte, size)
  1162. array[0] = 1
  1163. _, err = followerConnection.Write(array)
  1164. if err != nil {
  1165. panic(err)
  1166. }
  1167. return true
  1168. } else {
  1169. panic(err)
  1170. }
  1171. }
  1172. return false
  1173. }
  1174. //reads an array which is returned and of size "size" from the connection
  1175. //returns true if error occured
  1176. func readFrom(connection net.Conn, size int, followerConnection net.Conn, sizeError int) ([]byte, bool) {
  1177. var array []byte
  1178. remainingSize := size
  1179. for remainingSize > 0 {
  1180. var err error
  1181. toAppend := make([]byte, mtu)
  1182. if remainingSize > mtu {
  1183. _, err = connection.Read(toAppend)
  1184. array = append(array, toAppend...)
  1185. remainingSize -= mtu
  1186. } else {
  1187. _, err = connection.Read(toAppend[:remainingSize])
  1188. array = append(array, toAppend[:remainingSize]...)
  1189. remainingSize = 0
  1190. }
  1191. if err != nil {
  1192. //lets follower know that client has disconnected unexpectedly
  1193. if connection.RemoteAddr().String() != follower {
  1194. fmt.Println(err)
  1195. array := make([]byte, sizeError)
  1196. array[0] = 1
  1197. _, err = followerConnection.Write(array)
  1198. if err != nil {
  1199. panic(err)
  1200. }
  1201. return nil, true
  1202. } else {
  1203. panic(err)
  1204. }
  1205. }
  1206. }
  1207. return array, false
  1208. }
  1209. func intToByte(myInt int) (retBytes []byte) {
  1210. retBytes = make([]byte, 4)
  1211. retBytes[3] = byte((myInt >> 24) & 0xff)
  1212. retBytes[2] = byte((myInt >> 16) & 0xff)
  1213. retBytes[1] = byte((myInt >> 8) & 0xff)
  1214. retBytes[0] = byte(myInt & 0xff)
  1215. return
  1216. }