package main //#cgo CFLAGS: -fopenmp -O2 //#cgo LDFLAGS: -lcrypto -lm -fopenmp //#include "../c/dpf.h" //#include "../c/okv.h" //#include "../c/dpf.c" //#include "../c/okv.c" import "C" import ( "crypto/rand" "crypto/rsa" "crypto/sha256" "crypto/tls" "crypto/x509" "crypto/x509/pkix" "encoding/pem" "fmt" "log" "math" "math/big" mr "math/rand" "net" "os" "sort" "strings" "sync" "time" lib "2PPS/lib" "unsafe" "golang.org/x/crypto/nacl/box" ) //this stores all neccessary information for each client type clientKeys struct { roundsParticipating int PublicKey *[32]byte SharedSecret [32]byte PirQuery [][]byte } var clientData = make(map[net.Addr]clientKeys) const follower string = "127.0.0.1:4442" //Maximum Transport Unit const mtu int = 1100 var leaderPrivateKey *[32]byte var leaderPublicKey *[32]byte var followerPublicKey *[32]byte const maxNumberOfClients = 1000000 var topicList []byte var topicAmount int var archiveTopicAmount int // every roundsBeforeUpdate the client updates his pirQuery const roundsBeforeUpdate = 1 const neededSubscriptions = 1 const numThreads = 12 const dataLength = 256 const numClients = 1000 const minDBWriteSize = numClients * 0.5 //riposte says 19.5 const dbSizeFactor = 5 const publisherGuess = 0.2 var dbWriteSize float64 = numClients * dbSizeFactor * publisherGuess //this is the number of positions for auditing var extraPositions int = 10 var collisionCounter []float64 var clientsConnected float64 var maxTimePerRound time.Duration = 3 * time.Second //counts the number of rounds var round int var clientsServedPhase1 []int var clientsServedPhase3 []int var startPhase1 time.Time var startPhase2 time.Time var startPhase3 time.Time //only prints auditing time for one client per round var firstAuditPrint bool var auditingStart time.Time var auditingEnd time.Time var startTime time.Time var startTimeRound time.Time //channel for goroutine communication with clients var phase1Channel = make(chan net.Conn, maxNumberOfClients) var phase3Channel = make(chan net.Conn, maxNumberOfClients) //variables for calculating the dbWrite size const publisherRounds int = 10 var publisherAmount float64 var publisherHistory [publisherRounds]int var firstTweetSend bool func main() { f, err := os.OpenFile("evalData", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) if err != nil { log.Fatalf("error opening file: %v", err) } defer f.Close() log.SetOutput(f) log.Println("roundsBeforeUpdate", roundsBeforeUpdate) log.Println("neededSubscriptions", neededSubscriptions) log.Println("dataLength", dataLength) log.Println("maxTimePerRound~", maxTimePerRound*3) log.Println("extraPositions", extraPositions) log.Println("numClients", numClients) log.Println("Archiving is done every 24h, no Clients wants Archive") //prevents race conditions for wrtiting m := &sync.RWMutex{} startTime = time.Now() clientsServedPhase1 = make([]int, 1000) clientsServedPhase3 = make([]int, 1000) generatedPublicKey, generatedPrivateKey, err := box.GenerateKey(rand.Reader) if err != nil { panic(err) } //why is this neccessary? leaderPrivateKey = generatedPrivateKey leaderPublicKey = generatedPublicKey C.initializeServer(C.int(numThreads)) //calls follower for setup conf := &tls.Config{ InsecureSkipVerify: true, } followerConnection, err := tls.Dial("tcp", follower, conf) if err != nil { panic(err) } followerConnection.SetDeadline(time.Time{}) //receives follower publicKey var tmpFollowerPubKey [32]byte _, err = followerConnection.Read(tmpFollowerPubKey[:]) if err != nil { panic(err) } followerPublicKey = &tmpFollowerPubKey //send publicKey to follower writeTo(followerConnection, leaderPublicKey[:]) writeTo(followerConnection, intToByte(neededSubscriptions)) writeTo(followerConnection, intToByte(int(dbWriteSize))) //goroutine for accepting new clients go func() { leaderConnectionPrivateKey, err := rsa.GenerateKey(rand.Reader, 2048) if err != nil { panic(err) } // Generate a pem block with the private key keyPem := pem.EncodeToMemory(&pem.Block{ Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(leaderConnectionPrivateKey), }) tml := x509.Certificate{ // you can add any attr that you need NotBefore: time.Now(), NotAfter: time.Now().AddDate(5, 0, 0), // you have to generate a different serial number each execution SerialNumber: big.NewInt(123123), Subject: pkix.Name{ CommonName: "New Name", Organization: []string{"New Org."}, }, BasicConstraintsValid: true, } cert, err := x509.CreateCertificate(rand.Reader, &tml, &tml, &leaderConnectionPrivateKey.PublicKey, leaderConnectionPrivateKey) if err != nil { panic(err) } // Generate a pem block with the certificate certPem := pem.EncodeToMemory(&pem.Block{ Type: "CERTIFICATE", Bytes: cert, }) tlsCert, err := tls.X509KeyPair(certPem, keyPem) if err != nil { panic(err) } config := &tls.Config{Certificates: []tls.Certificate{tlsCert}} //listens for clients lnClients, err := tls.Listen("tcp", ":4441", config) if err != nil { panic(err) } defer lnClients.Close() for { clientConnection, err := lnClients.Accept() if err != nil { fmt.Println("Client connection error 1", err) clientConnection.Close() break } clientConnection.SetDeadline(time.Time{}) //sends topicList so client can participate in phase 3 asap errorBool := sendTopicLists(clientConnection, followerConnection, true) if errorBool { break } //send leader publicKey _, err = clientConnection.Write(leaderPublicKey[:]) if err != nil { fmt.Println("Client connection error 2", err) clientConnection.Close() break } //send follower publicKey _, err = clientConnection.Write(followerPublicKey[:]) if err != nil { fmt.Println("Client connection error 3", err) clientConnection.Close() break } var clientPublicKey *[32]byte var tmpClientPublicKey [32]byte //gets publicKey from client _, err = clientConnection.Read(tmpClientPublicKey[:]) if err != nil { fmt.Println("Client connection error 4", err) clientConnection.Close() break } _, err = clientConnection.Write(intToByte(neededSubscriptions)) if err != nil { fmt.Println("Client connection error 5", err) clientConnection.Close() break } _, err = clientConnection.Write(intToByte(int(startTime.Unix()))) if err != nil { fmt.Println("Client connection error 6", err) clientConnection.Close() break } clientPublicKey = &tmpClientPublicKey //this is the key for map(client data) remoteAddress := clientConnection.RemoteAddr() //pirQuery will be added in phase 3 //bs! only want to set roundsParticipating and answerAmount to 0, mb there is a better way //will work for now var emptyArray [32]byte var emptyByteArray [][]byte keys := clientKeys{0, clientPublicKey, emptyArray, emptyByteArray} m.Lock() clientData[remoteAddress] = keys m.Unlock() phase1Channel <- clientConnection clientsConnected++ } }() wg := &sync.WaitGroup{} //the current phase phase := make([]byte, 1) for { startPhase1 = time.Now() startTimeRound = time.Now() firstAuditPrint = true phase[0] = 1 round++ fmt.Println("Round", round) fmt.Println("clientsServedPhase1", clientsServedPhase1[round-1]) fmt.Println("clientsServedPhase3", clientsServedPhase3[round-1]) log.Println() log.Println("Phase 1 Round", round) log.Println() bytesSaved := lib.GetBytesSaved() log.Println("bytesSaved Percentage", bytesSaved) //creates a new write Db for this round for i := 0; i < int(dbWriteSize); i++ { C.createDb(C.int(1), C.int(dataLength)) } //creates a new db containing virtual addresses for auditing virtualAddresses := createVirtualAddresses() //send all virtualAddresses to follower for i := 0; i <= int(dbWriteSize); i++ { writeTo(followerConnection, intToByte(virtualAddresses[i])) } //moves all clients to phase1 if len(phase3Channel) > 0 { for client := range phase3Channel { phase1Channel <- client if len(phase3Channel) == 0 { break } } } for id := 0; id < numThreads; id++ { wg.Add(1) followerConnection, err := tls.Dial("tcp", follower, conf) if err != nil { panic(err) } followerConnection.SetDeadline(time.Time{}) go phase1(id, phase, followerConnection, wg, m, virtualAddresses) } wg.Wait() log.Println("fullDurationPhase1", time.Since(startPhase1).Seconds()) log.Println("fullAuditingDuration~", auditingEnd.Sub(auditingStart).Seconds()*clientsConnected) log.Println("auditingPercentage", (auditingEnd.Sub(auditingStart).Seconds()*clientsConnected)/(time.Since(startPhase1).Seconds())) //Phase 2 startPhase2 = time.Now() followerConnection, err := tls.Dial("tcp", follower, conf) if err != nil { panic(err) } followerConnection.SetDeadline(time.Time{}) phase2(followerConnection) log.Println("fullDurationPhase2", time.Since(startPhase2).Seconds()) //Phase 3 //moves all clients to phase3 if len(phase1Channel) > 0 { for client := range phase1Channel { phase3Channel <- client if len(phase1Channel) == 0 { break } } } startPhase3 = time.Now() //no tweets -> continue to phase 1 and mb get tweets topicList, topicAmount = lib.GetTopicList(0) if len(topicList) == 0 { continue } firstTweetSend = true phase[0] = 3 startTimeRound = time.Now() for id := 0; id < numThreads; id++ { wg.Add(1) followerConnection, err := tls.Dial("tcp", follower, conf) if err != nil { panic(err) } followerConnection.SetDeadline(time.Time{}) go phase3(id, phase, followerConnection, wg, m) } wg.Wait() log.Println("fullDurationPhase3", time.Since(startPhase3).Seconds()) lib.CleanUpdbR(round) } } func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGroup, m *sync.RWMutex, virtualAddresses []int) { roundAsBytes := intToByte(round) gotClient := make([]byte, 1) gotClient[0] = 0 //wait until time is up for len(phase1Channel) == 0 { if time.Since(startTimeRound) > maxTimePerRound { //tells follower that this worker is done writeTo(followerConnection, gotClient) wg.Done() return } time.Sleep(1 * time.Second) } for clientConnection := range phase1Channel { clientsServedPhase1[round] = clientsServedPhase1[round] + 1 gotClient[0] = 1 //tells follower that this worker got a clientConnection writeTo(followerConnection, gotClient) //sends clients publicKey to follower m.RLock() clientPublicKey := clientData[clientConnection.RemoteAddr()].PublicKey m.RUnlock() writeTo(followerConnection, clientPublicKey[:]) //setup the worker-specific db dbSize := int(C.dbSize) db := make([][]byte, dbSize) for i := 0; i < dbSize; i++ { db[i] = make([]byte, int(C.db[i].dataSize)) } //tells client that phase 1 has begun errorBool := writeToWError(clientConnection, phase, followerConnection, 5) if errorBool { contBool := handleClientDC(wg, followerConnection, phase1Channel) if contBool { continue } else { return } } //tells client current dbWriteSize errorBool = writeToWError(clientConnection, intToByte(int(dbWriteSize)), followerConnection, 5) if errorBool { contBool := handleClientDC(wg, followerConnection, phase1Channel) if contBool { continue } else { return } } //tells client current round errorBool = writeToWError(clientConnection, roundAsBytes, followerConnection, 5) if errorBool { contBool := handleClientDC(wg, followerConnection, phase1Channel) if contBool { continue } else { return } } //begin auditing if id == 0 && firstAuditPrint { auditingStart = time.Now() } m.RLock() var clientKeys = clientData[clientConnection.RemoteAddr()] m.RUnlock() clientKeys, pirQuery, errorBool := handlePirQuery(clientKeys, clientConnection, followerConnection, 0, true) if errorBool { contBool := handleClientDC(wg, followerConnection, phase1Channel) if contBool { continue } else { return } } errorBool = getSendVirtualAddress(pirQuery[0], virtualAddresses, clientKeys.SharedSecret, clientConnection, followerConnection) if errorBool { contBool := handleClientDC(wg, followerConnection, phase1Channel) if contBool { continue } else { return } } if id == 0 && firstAuditPrint { firstAuditPrint = false auditingEnd = time.Now() log.Println("Auditing duration", time.Since(auditingStart).Seconds(), "numVirtualAddresses", len(virtualAddresses)) } m.Lock() clientData[clientConnection.RemoteAddr()] = clientKeys m.Unlock() //accept dpfQuery from client dpfLengthBytes, errorBool := readFrom(clientConnection, 4, followerConnection, 5) if errorBool { contBool := handleClientDC(wg, followerConnection, phase1Channel) if contBool { continue } else { return } } dpfLength := byteToInt(dpfLengthBytes) dpfQueryAEncrypted, errorBool := readFrom(clientConnection, dpfLength, followerConnection, 5) if errorBool { contBool := handleClientDC(wg, followerConnection, phase1Channel) if contBool { continue } else { return } } dpfQueryBEncrypted, errorBool := readFrom(clientConnection, dpfLength, followerConnection, 5) if errorBool { contBool := handleClientDC(wg, followerConnection, phase1Channel) if contBool { continue } else { return } } writeToWError(followerConnection, dpfLengthBytes, nil, 0) writeToWError(followerConnection, dpfQueryBEncrypted, nil, 0) //decrypt dpfQueryA for sorting into db var decryptNonce [24]byte copy(decryptNonce[:], dpfQueryAEncrypted[:24]) dpfQueryA, ok := box.Open(nil, dpfQueryAEncrypted[24:], &decryptNonce, clientPublicKey, leaderPrivateKey) if !ok { panic("dpfQueryA decryption not ok") } ds := int(C.db[0].dataSize) dataShareLeader := make([]byte, ds) pos := C.getUint128_t(C.int(virtualAddresses[int(dbWriteSize)])) C.evalDPF(C.ctx[id], (*C.uchar)(&dpfQueryA[0]), pos, C.int(ds), (*C.uchar)(&dataShareLeader[0])) dataShareFollower, _ := readFrom(followerConnection, ds, nil, 0) writeToWError(followerConnection, dataShareLeader, nil, 0) auditXOR := make([]byte, ds) passedAudit := true for i := 0; i < ds; i++ { auditXOR[i] = dataShareLeader[i] ^ dataShareFollower[i] //client tried to write to a position that is not a virtuallAddress if auditXOR[i] != 0 { clientConnection.Close() passedAudit = false } } if passedAudit { //run dpf, xor into local db for i := 0; i < dbSize; i++ { ds := int(C.db[i].dataSize) dataShare := make([]byte, ds) pos := C.getUint128_t(C.int(virtualAddresses[i])) C.evalDPF(C.ctx[id], (*C.uchar)(&dpfQueryA[0]), pos, C.int(ds), (*C.uchar)(&dataShare[0])) for j := 0; j < ds; j++ { db[i][j] = db[i][j] ^ dataShare[j] } } //xor the worker's DB into the main DB for i := 0; i < dbSize; i++ { m.Lock() C.xorIn(C.int(i), (*C.uchar)(&db[i][0])) m.Unlock() } phase3Channel <- clientConnection } //loop that waits for new client or leaves phase1 if time is up for { if time.Since(startTimeRound) < maxTimePerRound || len(phase1Channel) > 0 { //this worker handles the next client if len(phase1Channel) > 0 { break //this worker waits for next client } else { time.Sleep(1 * time.Second) } //times up } else { //tells follower that this worker is done gotClient[0] = 0 writeTo(followerConnection, gotClient) wg.Done() return } } } } func phase2(followerConnection net.Conn) { //gets current seed seedLeader := make([]byte, 16) C.readSeed((*C.uchar)(&seedLeader[0])) //get data dbSize := int(C.dbSize) tmpdbLeader := make([][]byte, dbSize) for i := range tmpdbLeader { tmpdbLeader[i] = make([]byte, dataLength) } for i := 0; i < dbSize; i++ { C.readData(C.int(i), (*C.uchar)(&tmpdbLeader[i][0])) } //writes seed to follower writeTo(followerConnection, seedLeader) //write data to follower //this is surely inefficent for i := 0; i < dbSize; i++ { writeTo(followerConnection, tmpdbLeader[i]) } //receive seed from follower seedFollower, _ := readFrom(followerConnection, 16, nil, 0) //receive data from follower tmpdbFollower := make([][]byte, dbSize) for i := range tmpdbFollower { tmpdbFollower[i] = make([]byte, dataLength) } for i := 0; i < dbSize; i++ { tmpdbFollower[i], _ = readFrom(followerConnection, dataLength, nil, 0) } //put together the db tmpdb := make([][]byte, dbSize) for i := range tmpdb { tmpdb[i] = make([]byte, dataLength) } //get own Ciphers ciphersLeader := make([]*C.uchar, dbSize) for i := 0; i < dbSize; i++ { ciphersLeader[i] = (*C.uchar)(C.malloc(16)) } for i := 0; i < dbSize; i++ { C.getCipher(1, C.int(i), ciphersLeader[i]) } //send own Ciphers to follower for i := 0; i < dbSize; i++ { writeTo(followerConnection, C.GoBytes(unsafe.Pointer(ciphersLeader[i]), 16)) } //receive ciphers from follower ciphersFollower := make([]byte, dbSize*16) for i := 0; i < dbSize; i++ { _, err := followerConnection.Read(ciphersFollower[i*16:]) if err != nil { panic(err) } } //put in ciphers from follower for i := 0; i < dbSize; i++ { C.putCipher(1, C.int(i), (*C.uchar)(&ciphersFollower[i*16])) } //decrypt each row for i := 0; i < dbSize; i++ { C.decryptRow(C.int(i), (*C.uchar)(&tmpdb[i][0]), (*C.uchar)(&tmpdbLeader[i][0]), (*C.uchar)(&tmpdbFollower[i][0]), (*C.uchar)(&seedLeader[0]), (*C.uchar)(&seedFollower[0])) } var tweets []lib.Tweet var currentPublisherAmount float64 var collisions float64 for i := 0; i < dbSize; i++ { //discard cover message if tmpdb[i][1] == 0 { continue //collision } else if -1 == strings.Index(string(tmpdb[i]), ";;") { currentPublisherAmount++ currentPublisherAmount++ collisions++ continue } else { currentPublisherAmount++ //reconstruct tweet var position int = 0 var topics []string var topic string var text string for _, letter := range tmpdb[i] { if string(letter) == ";" { if topic != "" { topics = append(topics, topic) topic = "" } position++ } else { if position == 0 { if string(letter) == "," { topics = append(topics, topic) topic = "" } else { //if topics are //int //topic = topic + fmt.Sprint(int(letter)) //string topic = topic + string(letter) } } else if position == 1 { text = text + string(letter) } } } tweet := lib.Tweet{"", -1, topics, text, round} if text != "" { tweets = append(tweets, tweet) } else { //this is a odd(number) way collisions collisions++ } } } collisionCounter = append(collisionCounter, collisions) log.Println("Collision percentage this round", collisions/dbWriteSize, "dbWriteSize", dbWriteSize) lib.NewEntries(tweets, 0) C.resetDb() //calculates the publisherAverage over the last publisherRounds rounds index := round % publisherRounds publisherHistory[index] = int(currentPublisherAmount) log.Println("currentPublisherAmount", currentPublisherAmount, "publisher percentage", currentPublisherAmount/clientsConnected) var publisherAmount int for _, num := range publisherHistory { publisherAmount += num } publisherAverage := 0 if round < publisherRounds { publisherAverage = publisherAmount / round } else { publisherAverage = publisherAmount / publisherRounds } //calculates the dbWriteSize for this round dbWriteSize = math.Ceil(dbSizeFactor * float64(publisherAverage)) if dbWriteSize < minDBWriteSize { dbWriteSize = minDBWriteSize } //writes dbWriteSize of current round to follower writeTo(followerConnection, intToByte(int(dbWriteSize))) lib.CleanUpdbR(round) } //opti! mb it is quicker to send updated topicLists to clients first so pirQuerys are ready func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGroup, m *sync.RWMutex) { gotClient := make([]byte, 1) gotClient[0] = 0 //wait until time is up for len(phase3Channel) == 0 { if time.Since(startTimeRound) > maxTimePerRound*2 { //tells follower that this worker is done writeToWError(followerConnection, gotClient, nil, 0) wg.Done() return } time.Sleep(1 * time.Second) } for clientConnection := range phase3Channel { clientsServedPhase3[round] = clientsServedPhase3[round] + 1 gotClient[0] = 1 //tells follower that this worker got a clientConnection writeToWError(followerConnection, gotClient, nil, 0) //tells client current phase errorBool := writeToWError(clientConnection, phase, followerConnection, 2) if errorBool { contBool := handleClientDC(wg, followerConnection, phase3Channel) if contBool { continue } else { return } } /* possible Values 0 : new client leader expects sharedSecrets, expects pirQuery 1 : update needed leader sends topicList, performs local update of sharedSecret, expects pirQuery 2 : no update needed nothing */ subPhase := make([]byte, 1) //gets the data for the current client m.RLock() var clientKeys = clientData[clientConnection.RemoteAddr()] m.RUnlock() var roundsParticipating = clientKeys.roundsParticipating //client participates for the first time if roundsParticipating == 0 { subPhase[0] = 0 } else if roundsParticipating%roundsBeforeUpdate == 0 { subPhase[0] = 1 } else { subPhase[0] = 2 } //tells client what leader expects errorBool = writeToWError(clientConnection, subPhase, followerConnection, 2) if errorBool { contBool := handleClientDC(wg, followerConnection, phase3Channel) if contBool { continue } else { return } } //tells follower what will happen writeToWError(followerConnection, subPhase, nil, 0) //sends clients publicKey so follower knows which client is being served writeTo(followerConnection, clientKeys.PublicKey[:]) //increases rounds participating for client clientKeys.roundsParticipating = roundsParticipating + 1 //declaring variables here to prevent dupclicates later m.RLock() var sharedSecret [32]byte = clientData[clientConnection.RemoteAddr()].SharedSecret m.RUnlock() if subPhase[0] == 0 { errorBool := sendTopicLists(clientConnection, followerConnection, false) if errorBool { contBool := handleClientDC(wg, followerConnection, phase3Channel) if contBool { continue } else { return } } clientKeys, _, errorBool = handlePirQuery(clientKeys, clientConnection, followerConnection, int(subPhase[0]), false) if errorBool { contBool := handleClientDC(wg, followerConnection, phase3Channel) if contBool { continue } else { return } } } else if subPhase[0] == 1 { errorBool := sendTopicLists(clientConnection, followerConnection, false) if errorBool { contBool := handleClientDC(wg, followerConnection, phase3Channel) if contBool { continue } else { return } } //updates sharedSecret sharedSecret = sha256.Sum256(sharedSecret[:]) clientKeys.SharedSecret = sharedSecret clientKeys, _, errorBool = handlePirQuery(clientKeys, clientConnection, followerConnection, int(subPhase[0]), false) if errorBool { contBool := handleClientDC(wg, followerConnection, phase3Channel) if contBool { continue } else { return } } } errorBool = getSendTweets(clientKeys, nil, clientConnection, followerConnection) if errorBool { contBool := handleClientDC(wg, followerConnection, phase3Channel) if contBool { continue } else { return } } wantsArchive, errorBool := readFrom(clientConnection, 1, followerConnection, 2) if errorBool { contBool := handleClientDC(wg, followerConnection, phase3Channel) if contBool { continue } else { return } } writeToWError(followerConnection, wantsArchive, nil, 0) if wantsArchive[0] == 1 && archiveTopicAmount > 0 { _, archiveQuerys, errorBool := handlePirQuery(clientKeys, clientConnection, followerConnection, -1, false) if errorBool { contBool := handleClientDC(wg, followerConnection, phase3Channel) if contBool { continue } else { return } } errorBool = getSendTweets(clientKeys, archiveQuerys, clientConnection, followerConnection) if errorBool { contBool := handleClientDC(wg, followerConnection, phase3Channel) if contBool { continue } else { return } } } //saves all changes for client m.Lock() clientData[clientConnection.RemoteAddr()] = clientKeys m.Unlock() phase1Channel <- clientConnection for { if time.Since(startTimeRound) < 2*maxTimePerRound || len(phase3Channel) > 0 { //this worker handles the next client if len(phase3Channel) > 0 { break //this worker waits for next client } else { time.Sleep(1 * time.Second) } //times up } else { //tells follower that this worker is done gotClient[0] = 0 writeToWError(followerConnection, gotClient, nil, 0) wg.Done() return } } } } //returns true if there is another client func handleClientDC(wg *sync.WaitGroup, followerConnection net.Conn, channel chan net.Conn) bool { //loop that waits for new client or leaves phase1 if time is up for { if time.Since(startTimeRound) < maxTimePerRound { //this worker handles the next client if len(channel) > 0 { return true //this worker waits for next client } else { time.Sleep(1 * time.Second) } //times up } else { //tells follower that this worker is done gotClient := make([]byte, 1) gotClient[0] = 0 writeTo(followerConnection, gotClient) wg.Done() return false } } } func createVirtualAddresses() []int { //array will be filled with unique random ascending values //adapted from: https://stackoverflow.com/questions/20039025/java-array-of-unique-randomly-generated-integers //+extraPositions to have a position to evaluate each received message arraySize := int(dbWriteSize) + extraPositions var maxInt int = int(math.Pow(2, 31)) virtualAddresses := make([]int, arraySize) for i := 0; i < arraySize; i++ { virtualAddresses[i] = mr.Intn(maxInt) for j := 0; j < i; j++ { if virtualAddresses[i] == virtualAddresses[j] { i-- break } } } sort.Ints(virtualAddresses) return virtualAddresses } func getSendVirtualAddress(pirQuery []byte, virtualAddresses []int, sharedSecret [32]byte, clientConnection, followerConnection net.Conn) bool { //xores all requested addresses into virtuallAddress virtualAddress := make([]byte, 4) for index, num := range pirQuery { if num == 1 { currentAddress := intToByte(virtualAddresses[index]) for i := 0; i < 4; i++ { virtualAddress[i] = virtualAddress[i] ^ currentAddress[i] } } } //xores the sharedSecret for i := 0; i < 4; i++ { virtualAddress[i] = virtualAddress[i] ^ sharedSecret[i] } virtualAddressFollower, _ := readFrom(followerConnection, 4, nil, 0) //xores the data from follower for i := 0; i < 4; i++ { virtualAddress[i] = virtualAddress[i] ^ virtualAddressFollower[i] } errorBool := writeToWError(clientConnection, virtualAddress, followerConnection, 5) return errorBool } func getSendTweets(clientKeys clientKeys, archiveQuerys [][]byte, clientConnection, followerConnection net.Conn) bool { tmpNeededSubscriptions := neededSubscriptions if tmpNeededSubscriptions > topicAmount { tmpNeededSubscriptions = topicAmount } if archiveQuerys != nil { tmpNeededSubscriptions = len(archiveQuerys) if tmpNeededSubscriptions > archiveTopicAmount { tmpNeededSubscriptions = archiveTopicAmount } } tweets := make([][]byte, tmpNeededSubscriptions) for i := 0; i < tmpNeededSubscriptions; i++ { //gets all requested tweets if archiveQuerys == nil { tweets[i] = lib.GetTweets(clientKeys.PirQuery[i], dataLength, 0, *clientKeys.PublicKey) } else { tweets[i] = lib.GetTweets(archiveQuerys[i], dataLength, 1, *clientKeys.PublicKey) } //expand sharedSecret so it is of right length expandBy := len(tweets[i]) / 32 var expandedSharedSecret []byte for i := 0; i < expandBy; i++ { expandedSharedSecret = append(expandedSharedSecret, clientKeys.SharedSecret[:]...) } //Xor's sharedSecret with all tweets lib.Xor(expandedSharedSecret[:], tweets[i]) blockLength := len(tweets[i]) receivedTweets, _ := readFrom(followerConnection, blockLength, nil, 0) lib.Xor(receivedTweets, tweets[i]) } //sends tweets to client for i := 0; i < tmpNeededSubscriptions; i++ { tweetsLengthBytes := intToByte(len(tweets[i])) if firstTweetSend && archiveQuerys == nil { firstTweetSend = false log.Println("sending", len(tweets[0]), "bytes of data") } errorBool := writeToWError(clientConnection, tweetsLengthBytes, followerConnection, 2) if errorBool { return true } errorBool = writeToWError(clientConnection, tweets[i], followerConnection, 2) if errorBool { return true } } return false } func handlePirQuery(clientKeys clientKeys, clientConnection net.Conn, followerConnection net.Conn, subPhase int, doAuditing bool) (clientKeys, [][]byte, bool) { clientPublicKey := clientKeys.PublicKey //gets the msg length msgLengthBytes, errorBool := readFrom(clientConnection, 4, followerConnection, 5) if errorBool { return clientKeys, nil, true } msgLength := byteToInt(msgLengthBytes) //gets the leader box leaderBox, errorBool := readFrom(clientConnection, msgLength, followerConnection, 5) if errorBool { return clientKeys, nil, true } //gets the follower box followerBox, errorBool := readFrom(clientConnection, msgLength, followerConnection, 5) if errorBool { return clientKeys, nil, true } tmpNeededSubscriptions := neededSubscriptions if tmpNeededSubscriptions > topicAmount { tmpNeededSubscriptions = topicAmount } tmpTopicAmount := topicAmount if subPhase == -1 { archiveNeededSubscriptions, errorBool := readFrom(clientConnection, 4, followerConnection, 5) if errorBool { return clientKeys, nil, true } writeToWError(followerConnection, archiveNeededSubscriptions, nil, 0) tmpNeededSubscriptions = byteToInt(archiveNeededSubscriptions) tmpTopicAmount = archiveTopicAmount if tmpNeededSubscriptions > archiveTopicAmount { tmpNeededSubscriptions = archiveTopicAmount } } if doAuditing { tmpNeededSubscriptions = 1 tmpTopicAmount = int(dbWriteSize) } //send length to follower writeToWError(followerConnection, msgLengthBytes, nil, 0) //send box to follower writeToWError(followerConnection, followerBox, nil, 0) var decryptNonce [24]byte copy(decryptNonce[:], leaderBox[:24]) decrypted, ok := box.Open(nil, leaderBox[24:], &decryptNonce, clientPublicKey, leaderPrivateKey) if !ok { fmt.Println("pirQuery decryption not ok") return clientKeys, nil, true } //if sharedSecret is send if subPhase == 0 { var tmpSharedSecret [32]byte for index := 0; index < 32; index++ { tmpSharedSecret[index] = decrypted[index] } clientKeys.SharedSecret = tmpSharedSecret decrypted = decrypted[32:] } if doAuditing { result := make([][]byte, 1) result[0] = decrypted return clientKeys, result, false } if subPhase == -1 { } //transforms byteArray to ints of wanted topics pirQueryFlattened := decrypted pirQuerys := make([][]byte, tmpNeededSubscriptions) for i := range pirQuerys { pirQuerys[i] = make([]byte, tmpTopicAmount) } for i := 0; i < tmpNeededSubscriptions; i++ { pirQuerys[i] = pirQueryFlattened[i*tmpTopicAmount : (i+1)*tmpTopicAmount] } //sets the pirQuery for the client in case whe are not archiving, and not Auditing if subPhase != -1 { clientKeys.PirQuery = pirQuerys } return clientKeys, pirQuerys, false } func transformBytesToStringArray(topicsAsBytes []byte) []string { var topics []string var topic string var position int = 0 for _, letter := range topicsAsBytes { if string(letter) == "," { topics[position] = topic topic = "" position++ } else { topic = topic + string(letter) } } return topics } func byteToInt(myBytes []byte) (x int) { x = int(myBytes[3])<<24 + int(myBytes[2])<<16 + int(myBytes[1])<<8 + int(myBytes[0]) return } //returns true if error occured func sendTopicLists(clientConnection, followerConnection net.Conn, setup bool) bool { for i := 0; i < 2; i++ { var topicList []byte if i == 0 { topicList, topicAmount = lib.GetTopicList(i) } else { topicList, archiveTopicAmount = lib.GetTopicList(i) } topicListLengthBytes := intToByte(len(topicList)) if !setup { err := writeToWError(clientConnection, topicListLengthBytes, followerConnection, 5) if err { return true } err = writeToWError(clientConnection, topicList, followerConnection, 5) if err { return true } } else { _, err := clientConnection.Write(topicListLengthBytes) if err != nil { return true } _, err = clientConnection.Write(topicList) if err != nil { return true } } } return false } //sends the array to the connection func writeTo(connection net.Conn, array []byte) { remainingLength := len(array) for remainingLength > 0 { if remainingLength >= mtu { _, err := connection.Write(array[:mtu]) if err != nil { panic(err) } array = array[mtu:] remainingLength -= mtu } else { _, err := connection.Write(array) if err != nil { panic(err) } remainingLength = 0 } } } func writeToWError(connection net.Conn, array []byte, followerConnection net.Conn, size int) bool { if connection.RemoteAddr().String() == follower { arrayWError := make([]byte, 1) arrayWError = append(arrayWError, array[:]...) remainingLength := len(arrayWError) for remainingLength > 0 { if remainingLength >= mtu { _, err := connection.Write(arrayWError[:mtu]) if err != nil { return handleError(connection, followerConnection, size, err) } arrayWError = arrayWError[mtu:] remainingLength -= mtu } else { _, err := connection.Write(arrayWError) if err != nil { return handleError(connection, followerConnection, size, err) } remainingLength = 0 } } } else { remainingLength := len(array) for remainingLength > 0 { if remainingLength >= mtu { _, err := connection.Write(array[:mtu]) if err != nil { return handleError(connection, followerConnection, size, err) } array = array[mtu:] remainingLength -= mtu } else { _, err := connection.Write(array) if err != nil { return handleError(connection, followerConnection, size, err) } remainingLength = 0 } } } return false } func handleError(connection, followerConnection net.Conn, size int, err error) bool { if err != nil { //lets follower know that client has disconnected unexpectedly if connection.RemoteAddr().String() != follower { if size > mtu { fmt.Println("have a look here") } fmt.Println("handleError", err) array := make([]byte, size) array[0] = 1 _, err = followerConnection.Write(array) if err != nil { panic(err) } return true } else { panic(err) } } return false } //reads an array which is returned and of size "size" from the connection //returns true if error occured func readFrom(connection net.Conn, size int, followerConnection net.Conn, sizeError int) ([]byte, bool) { var array []byte remainingSize := size for remainingSize > 0 { var err error toAppend := make([]byte, mtu) if remainingSize > mtu { _, err = connection.Read(toAppend) array = append(array, toAppend...) remainingSize -= mtu } else { _, err = connection.Read(toAppend[:remainingSize]) array = append(array, toAppend[:remainingSize]...) remainingSize = 0 } if err != nil { //lets follower know that client has disconnected unexpectedly if connection.RemoteAddr().String() != follower { fmt.Println(err) array := make([]byte, sizeError) array[0] = 1 _, err = followerConnection.Write(array) if err != nil { panic(err) } return nil, true } else { panic(err) } } } return array, false } func intToByte(myInt int) (retBytes []byte) { retBytes = make([]byte, 4) retBytes[3] = byte((myInt >> 24) & 0xff) retBytes[2] = byte((myInt >> 16) & 0xff) retBytes[1] = byte((myInt >> 8) & 0xff) retBytes[0] = byte(myInt & 0xff) return }