package main //#cgo CFLAGS: -fopenmp -O2 //#cgo LDFLAGS: -lcrypto -lm -fopenmp //#include "../c/dpf.h" //#include "../c/okv.h" //#include "../c/dpf.c" //#include "../c/okv.c" import "C" //ssssssssssssssss import ( "crypto/rand" "crypto/rsa" "crypto/sha256" "crypto/tls" "crypto/x509" "crypto/x509/pkix" "encoding/pem" "fmt" "math" "math/big" mr "math/rand" "net" "sort" "strconv" "sync" "time" lib "2PPS/lib" "unsafe" "golang.org/x/crypto/nacl/box" ) //this stores all neccessary information for each client type clientKeys struct { roundsParticipating int PublicKey *[32]byte SharedSecret [32]byte PirQuery [][]byte } var clientData = make(map[net.Addr]clientKeys) const follower string = "127.0.0.1:4442" var leaderPrivateKey *[32]byte var leaderPublicKey *[32]byte var followerPublicKey *[32]byte const maxNumberOfClients = 10000000 var topicList []byte var topicAmount int var archiveTopicAmount int // every roundsBeforeUpdate the client updates his pirQuery const roundsBeforeUpdate = 5 const neededSubscriptions = 1 const numThreads = 12 const dataLength = 64 var dbWriteSize int = 4 var maxTimePerRound time.Duration = 5 * time.Second //counts the number of rounds var round int = 1 var startTime time.Time //channel for goroutine communication with clients var phase1Channel = make(chan net.Conn, maxNumberOfClients) var phase3Channel = make(chan net.Conn, maxNumberOfClients) //variables for calculating the dbWrite size const publisherRounds int = 3 var publisherAmount float64 var publisherHistory [publisherRounds]int //todo! handle client dc during phase1/3 func main() { //prevents race conditions for wrtiting m := &sync.RWMutex{} generatedPublicKey, generatedPrivateKey, err := box.GenerateKey(rand.Reader) if err != nil { panic(err) } //why is this neccessary? leaderPrivateKey = generatedPrivateKey leaderPublicKey = generatedPublicKey C.initializeServer(C.int(numThreads)) //calls follower for setup conf := &tls.Config{ InsecureSkipVerify: true, } followerConnection, err := tls.Dial("tcp", follower, conf) if err != nil { panic(err) } followerConnection.SetDeadline(time.Time{}) //receives follower publicKey var tmpFollowerPubKey [32]byte _, err = followerConnection.Read(tmpFollowerPubKey[:]) if err != nil { panic(err) } followerPublicKey = &tmpFollowerPubKey //send publicKey to follower writeTo(followerConnection, leaderPublicKey[:]) //goroutine for accepting new clients go func() { leaderConnectionPrivateKey, err := rsa.GenerateKey(rand.Reader, 2048) if err != nil { panic(err) } // Generate a pem block with the private key keyPem := pem.EncodeToMemory(&pem.Block{ Type: "RSA PRIVATE KEY", Bytes: x509.MarshalPKCS1PrivateKey(leaderConnectionPrivateKey), }) tml := x509.Certificate{ // you can add any attr that you need NotBefore: time.Now(), NotAfter: time.Now().AddDate(5, 0, 0), // you have to generate a different serial number each execution SerialNumber: big.NewInt(123123), Subject: pkix.Name{ CommonName: "New Name", Organization: []string{"New Org."}, }, BasicConstraintsValid: true, } cert, err := x509.CreateCertificate(rand.Reader, &tml, &tml, &leaderConnectionPrivateKey.PublicKey, leaderConnectionPrivateKey) if err != nil { panic(err) } // Generate a pem block with the certificate certPem := pem.EncodeToMemory(&pem.Block{ Type: "CERTIFICATE", Bytes: cert, }) tlsCert, err := tls.X509KeyPair(certPem, keyPem) if err != nil { panic(err) } config := &tls.Config{Certificates: []tls.Certificate{tlsCert}} //listens for clients lnClients, err := tls.Listen("tcp", ":4441", config) if err != nil { panic(err) } defer lnClients.Close() for { clientConnection, err := lnClients.Accept() if err != nil { fmt.Println(err) } clientConnection.SetDeadline(time.Time{}) //sends topicList so client can participate in phase 3 asap errorBool := sendTopicLists(clientConnection, followerConnection, true) if errorBool { break } //send leader publicKey _, err = clientConnection.Write(leaderPublicKey[:]) if err != nil { fmt.Println(err) clientConnection.Close() break } //send follower publicKey _, err = clientConnection.Write(followerPublicKey[:]) if err != nil { fmt.Println(err) clientConnection.Close() break } var clientPublicKey *[32]byte var tmpClientPublicKey [32]byte //gets publicKey from client _, err = clientConnection.Read(tmpClientPublicKey[:]) if err != nil { fmt.Println(err) clientConnection.Close() break } clientPublicKey = &tmpClientPublicKey //this is the key for map(client data) remoteAddress := clientConnection.RemoteAddr() //pirQuery will be added in phase 3 //bs! only want to set roundsParticipating and answerAmount to 0, mb there is a better way //will work for now var emptyArray [32]byte var emptyByteArray [][]byte keys := clientKeys{0, clientPublicKey, emptyArray, emptyByteArray} m.Lock() clientData[remoteAddress] = keys m.Unlock() phase1Channel <- clientConnection } }() wg := &sync.WaitGroup{} //the current phase phase := make([]byte, 1) for { //phase1 startTime = time.Now() phase[0] = 1 fmt.Println("phase1") //creates a new write Db for this round for i := 0; i < dbWriteSize; i++ { C.createDb(C.int(1), C.int(dataLength)) } //creates a new db containing virtual addresses for auditing virtualAddresses := createVirtualAddresses() //send all virtualAddresses to follower for i := 0; i <= dbWriteSize; i++ { writeTo(followerConnection, intToByte(virtualAddresses[i])) } for id := 0; id < numThreads; id++ { wg.Add(1) followerConnection, err := tls.Dial("tcp", follower, conf) if err != nil { panic(err) } followerConnection.SetDeadline(time.Time{}) go phase1(id, phase, followerConnection, wg, m, startTime, virtualAddresses) } wg.Wait() fmt.Println("phase2") //phase2 followerConnection, err := tls.Dial("tcp", follower, conf) if err != nil { panic(err) } followerConnection.SetDeadline(time.Time{}) phase2(followerConnection) //phase3 fmt.Println("phase3") //no tweets -> continue to phase 1 and mb get tweets topicList, topicAmount = lib.GetTopicList(0) if len(topicList) == 0 { continue } phase[0] = 3 startTime = time.Now() for id := 0; id < numThreads; id++ { wg.Add(1) followerConnection, err := tls.Dial("tcp", follower, conf) if err != nil { panic(err) } followerConnection.SetDeadline(time.Time{}) go phase3(id, phase, followerConnection, wg, startTime, m) } wg.Wait() lib.CleanUpdbR(round) round++ } } func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGroup, m *sync.RWMutex, startTime time.Time, virtualAddresses []int) { roundAsBytes := intToByte(round) gotClient := make([]byte, 1) gotClient[0] = 0 //wait until time is up for len(phase1Channel) == 0 { if time.Since(startTime) > maxTimePerRound { //tells follower that this worker is done writeTo(followerConnection, gotClient) wg.Done() return } time.Sleep(1 * time.Second) } for clientConnection := range phase1Channel { gotClient[0] = 1 //tells follower that this worker got a clientConnection writeTo(followerConnection, gotClient) //sends clients publicKey to follower m.RLock() clientPublicKey := clientData[clientConnection.RemoteAddr()].PublicKey m.RUnlock() writeTo(followerConnection, clientPublicKey[:]) //setup the worker-specific db dbSize := int(C.dbSize) db := make([][]byte, dbSize) for i := 0; i < dbSize; i++ { db[i] = make([]byte, int(C.db[i].dataSize)) } //tells client that phase 1 has begun errorBool := writeToWError(clientConnection, phase, followerConnection, 5) if errorBool { contBool := handleClientDC(wg, followerConnection, phase1Channel) if contBool { continue } else { return } } //tells client current dbWriteSize errorBool = writeToWError(clientConnection, intToByte(dbWriteSize), followerConnection, 5) if errorBool { contBool := handleClientDC(wg, followerConnection, phase1Channel) if contBool { continue } else { return } } //tells client current round errorBool = writeToWError(clientConnection, roundAsBytes, followerConnection, 5) if errorBool { contBool := handleClientDC(wg, followerConnection, phase1Channel) if contBool { continue } else { return } } m.RLock() var clientKeys = clientData[clientConnection.RemoteAddr()] m.RUnlock() clientKeys, pirQuery, errorBool := handlePirQuery(clientKeys, clientConnection, followerConnection, 0, true) if errorBool { contBool := handleClientDC(wg, followerConnection, phase1Channel) if contBool { continue } else { return } } errorBool = getSendVirtualAddress(pirQuery[0], virtualAddresses, clientKeys.SharedSecret, clientConnection, followerConnection) if errorBool { contBool := handleClientDC(wg, followerConnection, phase1Channel) if contBool { continue } else { return } } m.Lock() clientData[clientConnection.RemoteAddr()] = clientKeys m.Unlock() //accept dpfQuery from client dpfLengthBytes, errorBool := readFrom(clientConnection, 4, followerConnection, 5) if errorBool { contBool := handleClientDC(wg, followerConnection, phase1Channel) if contBool { continue } else { return } } dpfLength := byteToInt(dpfLengthBytes) dpfQueryAEncrypted, errorBool := readFrom(clientConnection, dpfLength, followerConnection, 5) if errorBool { contBool := handleClientDC(wg, followerConnection, phase1Channel) if contBool { continue } else { return } } dpfQueryBEncrypted, errorBool := readFrom(clientConnection, dpfLength, followerConnection, 5) if errorBool { contBool := handleClientDC(wg, followerConnection, phase1Channel) if contBool { continue } else { return } } writeToWError(followerConnection, dpfLengthBytes, nil, 0) writeToWError(followerConnection, dpfQueryBEncrypted, nil, 0) //decrypt dpfQueryA for sorting into db var decryptNonce [24]byte copy(decryptNonce[:], dpfQueryAEncrypted[:24]) dpfQueryA, ok := box.Open(nil, dpfQueryAEncrypted[24:], &decryptNonce, clientPublicKey, leaderPrivateKey) if !ok { panic("dpfQueryA decryption not ok") } ds := int(C.db[0].dataSize) dataShareLeader := make([]byte, ds) pos := C.getUint128_t(C.int(virtualAddresses[dbWriteSize])) C.evalDPF(C.ctx[id], (*C.uchar)(&dpfQueryA[0]), pos, C.int(ds), (*C.uchar)(&dataShareLeader[0])) dataShareFollower, _ := readFrom(followerConnection, ds, nil, 0) writeToWError(followerConnection, dataShareLeader, nil, 0) auditXOR := make([]byte, ds) passedAudit := true for i := 0; i < ds; i++ { auditXOR[i] = dataShareLeader[i] ^ dataShareFollower[i] //client tried to write to a position that is not a virtuallAddress if auditXOR[i] != 0 { clientConnection.Close() passedAudit = false } } if passedAudit { //run dpf, xor into local db for i := 0; i < dbSize; i++ { ds := int(C.db[i].dataSize) dataShare := make([]byte, ds) pos := C.getUint128_t(C.int(virtualAddresses[i])) C.evalDPF(C.ctx[id], (*C.uchar)(&dpfQueryA[0]), pos, C.int(ds), (*C.uchar)(&dataShare[0])) for j := 0; j < ds; j++ { db[i][j] = db[i][j] ^ dataShare[j] } } //xor the worker's DB into the main DB for i := 0; i < dbSize; i++ { m.Lock() C.xorIn(C.int(i), (*C.uchar)(&db[i][0])) m.Unlock() } phase3Channel <- clientConnection } //loop that waits for new client or leaves phase1 if time is up for { if time.Since(startTime) < maxTimePerRound { //this worker handles the next client if len(phase1Channel) > 0 { break //this worker waits for next client } else { time.Sleep(1 * time.Second) } //times up } else { //tells follower that this worker is done gotClient[0] = 0 writeTo(followerConnection, gotClient) wg.Done() return } } } } func phase2(followerConnection net.Conn) { //gets current seed seedLeader := make([]byte, 16) C.readSeed((*C.uchar)(&seedLeader[0])) //get data dbSize := int(C.dbSize) tmpdbLeader := make([][]byte, dbSize) for i := range tmpdbLeader { tmpdbLeader[i] = make([]byte, dataLength) } for i := 0; i < dbSize; i++ { C.readData(C.int(i), (*C.uchar)(&tmpdbLeader[i][0])) } //writes seed to follower writeTo(followerConnection, seedLeader) //write data to follower //this is surely inefficent for i := 0; i < dbSize; i++ { writeTo(followerConnection, tmpdbLeader[i]) } //receive seed from follower seedFollower, _ := readFrom(followerConnection, 16, nil, 0) //receive data from follower tmpdbFollower := make([][]byte, dbSize) for i := range tmpdbFollower { tmpdbFollower[i] = make([]byte, dataLength) } for i := 0; i < dbSize; i++ { tmpdbFollower[i], _ = readFrom(followerConnection, dataLength, nil, 0) } //put together the db tmpdb := make([][]byte, dbSize) for i := range tmpdb { tmpdb[i] = make([]byte, dataLength) } //get own Ciphers ciphersLeader := make([]*C.uchar, dbSize) for i := 0; i < dbSize; i++ { ciphersLeader[i] = (*C.uchar)(C.malloc(16)) } for i := 0; i < dbSize; i++ { C.getCipher(1, C.int(i), ciphersLeader[i]) } //send own Ciphers to follower for i := 0; i < dbSize; i++ { writeTo(followerConnection, C.GoBytes(unsafe.Pointer(ciphersLeader[i]), 16)) } //receive ciphers from follower ciphersFollower := make([]byte, dbSize*16) for i := 0; i < dbSize; i++ { _, err := followerConnection.Read(ciphersFollower[i*16:]) if err != nil { panic(err) } } //put in ciphers from follower for i := 0; i < dbSize; i++ { C.putCipher(1, C.int(i), (*C.uchar)(&ciphersFollower[i*16])) } //decrypt each row for i := 0; i < dbSize; i++ { C.decryptRow(C.int(i), (*C.uchar)(&tmpdb[i][0]), (*C.uchar)(&tmpdbLeader[i][0]), (*C.uchar)(&tmpdbFollower[i][0]), (*C.uchar)(&seedLeader[0]), (*C.uchar)(&seedFollower[0])) } var tweets []lib.Tweet var currentPublisherAmount int = 0 for i := 0; i < dbSize; i++ { //discard cover message if tmpdb[i][0] == 0 { continue } else { currentPublisherAmount++ //reconstruct tweet var position int = 0 var topics []string var topic string var text string for _, letter := range tmpdb[i] { if string(letter) == ";" { if topic != "" { topics = append(topics, topic) topic = "" } position++ } else { if position == 0 { if string(letter) == "," { topics = append(topics, topic) topic = "" } else { topic = topic + string(letter) } } else if position == 1 { text = text + string(letter) } } } tweet := lib.Tweet{"", -1, topics, text, round} tweets = append(tweets, tweet) } } //fmt.Println("tweets recovered: ", tweets) //sort into read db lib.NewEntries(tweets, 0) C.resetDb() //calculates the publisherAverage over the last publisherRounds rounds index := round % publisherRounds publisherHistory[index] = currentPublisherAmount var publisherAmount int for _, num := range publisherHistory { publisherAmount += num } publisherAverage := 0 if round < publisherRounds { publisherAverage = publisherAmount / round } else { publisherAverage = publisherAmount / publisherRounds } //calculates the dbWriteSize for this round dbWriteSize = int(math.Ceil(19.5 * float64(publisherAverage))) //writes dbWriteSize of current round to follower writeTo(followerConnection, intToByte(dbWriteSize)) } func addTestTweets() { //creates test tweets tweets := make([]lib.Tweet, 5) for i := range tweets { j := i if i == 1 { j = 0 } text := "Text " + strconv.Itoa(i) var topics []string topics = append(topics, "Topic "+strconv.Itoa(j)) tweets[i] = lib.Tweet{"", -1, topics, text, i} } lib.NewEntries(tweets, 0) } //opti! mb it is quicker to send updated topicLists to clients first so pirQuerys are ready func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGroup, startTime time.Time, m *sync.RWMutex) { gotClient := make([]byte, 1) gotClient[0] = 0 //wait until time is up for len(phase3Channel) == 0 { if time.Since(startTime) > maxTimePerRound { //tells follower that this worker is done writeToWError(followerConnection, gotClient, nil, 0) wg.Done() return } time.Sleep(1 * time.Second) } for clientConnection := range phase3Channel { gotClient[0] = 1 //tells follower that this worker got a clientConnection writeToWError(followerConnection, gotClient, nil, 0) //tells client current phase errorBool := writeToWError(clientConnection, phase, followerConnection, 2) if errorBool { contBool := handleClientDC(wg, followerConnection, phase3Channel) if contBool { continue } else { return } } /* possible Values 0 : new client leader expects sharedSecrets, expects pirQuery 1 : update needed leader sends topicList, performs local update of sharedSecret, expects pirQuery 2 : no update needed nothing */ subPhase := make([]byte, 1) //gets the data for the current client m.RLock() var clientKeys = clientData[clientConnection.RemoteAddr()] m.RUnlock() var roundsParticipating = clientKeys.roundsParticipating //client participates for the first time if roundsParticipating == 0 { subPhase[0] = 0 } else if roundsParticipating%roundsBeforeUpdate == 0 { subPhase[0] = 1 } else { subPhase[0] = 2 } //tells client what leader expects errorBool = writeToWError(clientConnection, subPhase, followerConnection, 2) if errorBool { contBool := handleClientDC(wg, followerConnection, phase3Channel) if contBool { continue } else { return } } //tells follower what will happen writeToWError(followerConnection, subPhase, nil, 0) //sends clients publicKey so follower knows which client is being served writeTo(followerConnection, clientKeys.PublicKey[:]) //increases rounds participating for client clientKeys.roundsParticipating = roundsParticipating + 1 //declaring variables here to prevent dupclicates later m.RLock() var sharedSecret [32]byte = clientData[clientConnection.RemoteAddr()].SharedSecret m.RUnlock() if subPhase[0] == 0 { errorBool := sendTopicLists(clientConnection, followerConnection, false) if errorBool { contBool := handleClientDC(wg, followerConnection, phase3Channel) if contBool { continue } else { return } } clientKeys, _, errorBool = handlePirQuery(clientKeys, clientConnection, followerConnection, int(subPhase[0]), false) if errorBool { contBool := handleClientDC(wg, followerConnection, phase3Channel) if contBool { continue } else { return } } } else if subPhase[0] == 1 { errorBool := sendTopicLists(clientConnection, followerConnection, false) if errorBool { contBool := handleClientDC(wg, followerConnection, phase3Channel) if contBool { continue } else { return } } //updates sharedSecret sharedSecret = sha256.Sum256(sharedSecret[:]) clientKeys.SharedSecret = sharedSecret clientKeys, _, errorBool = handlePirQuery(clientKeys, clientConnection, followerConnection, int(subPhase[0]), false) if errorBool { contBool := handleClientDC(wg, followerConnection, phase3Channel) if contBool { continue } else { return } } } errorBool = getSendTweets(clientKeys, nil, clientConnection, followerConnection) if errorBool { contBool := handleClientDC(wg, followerConnection, phase3Channel) if contBool { continue } else { return } } wantsArchive, errorBool := readFrom(clientConnection, 1, followerConnection, 2) if errorBool { contBool := handleClientDC(wg, followerConnection, phase3Channel) if contBool { continue } else { return } } writeToWError(followerConnection, wantsArchive, nil, 0) if wantsArchive[0] == 1 && archiveTopicAmount > 0 { _, archiveQuerys, errorBool := handlePirQuery(clientKeys, clientConnection, followerConnection, -1, false) if errorBool { contBool := handleClientDC(wg, followerConnection, phase3Channel) if contBool { continue } else { return } } errorBool = getSendTweets(clientKeys, archiveQuerys, clientConnection, followerConnection) if errorBool { contBool := handleClientDC(wg, followerConnection, phase3Channel) if contBool { continue } else { return } } } //saves all changes for client m.Lock() clientData[clientConnection.RemoteAddr()] = clientKeys m.Unlock() phase1Channel <- clientConnection for { if time.Since(startTime) < maxTimePerRound { //this worker handles the next client if len(phase3Channel) > 0 { break //this worker waits for next client } else { time.Sleep(1 * time.Second) } //times up } else { //tells follower that this worker is done gotClient[0] = 0 writeToWError(followerConnection, gotClient, nil, 0) wg.Done() return } } } } //returns true if there is another client func handleClientDC(wg *sync.WaitGroup, followerConnection net.Conn, channel chan net.Conn) bool { //loop that waits for new client or leaves phase1 if time is up for { if time.Since(startTime) < maxTimePerRound { //this worker handles the next client if len(channel) > 0 { return true //this worker waits for next client } else { time.Sleep(1 * time.Second) } //times up } else { //tells follower that this worker is done gotClient := make([]byte, 1) gotClient[0] = 0 writeTo(followerConnection, gotClient) wg.Done() return false } } } func createVirtualAddresses() []int { //array will be filled with unique random ascending values //adapted from: https://stackoverflow.com/questions/20039025/java-array-of-unique-randomly-generated-integers //+1 to have a position to evaluate each received message arraySize := dbWriteSize + 1 var maxInt int = int(math.Pow(2, 31)) virtualAddresses := make([]int, arraySize) for i := 0; i < arraySize; i++ { virtualAddresses[i] = mr.Intn(maxInt) for j := 0; j < i; j++ { if virtualAddresses[i] == virtualAddresses[j] { i-- break } } } sort.Ints(virtualAddresses) return virtualAddresses } func getSendVirtualAddress(pirQuery []byte, virtualAddresses []int, sharedSecret [32]byte, clientConnection, followerConnection net.Conn) bool { //xores all requested addresses into virtuallAddress virtualAddress := make([]byte, 4) for index, num := range pirQuery { if num == 1 { currentAddress := intToByte(virtualAddresses[index]) for i := 0; i < 4; i++ { virtualAddress[i] = virtualAddress[i] ^ currentAddress[i] } } } //xores the sharedSecret for i := 0; i < 4; i++ { virtualAddress[i] = virtualAddress[i] ^ sharedSecret[i] } virtualAddressFollower, _ := readFrom(followerConnection, 4, nil, 0) //xores the data from follower for i := 0; i < 4; i++ { virtualAddress[i] = virtualAddress[i] ^ virtualAddressFollower[i] } errorBool := writeToWError(clientConnection, virtualAddress, followerConnection, 5) return errorBool } func getSendTweets(clientKeys clientKeys, archiveQuerys [][]byte, clientConnection, followerConnection net.Conn) bool { tmpNeededSubscriptions := neededSubscriptions if archiveQuerys != nil { tmpNeededSubscriptions = len(archiveQuerys) } tweets := make([][]byte, tmpNeededSubscriptions) for i := 0; i < tmpNeededSubscriptions; i++ { //gets all requested tweets if archiveQuerys == nil { tweets[i] = lib.GetTweets(clientKeys.PirQuery[i], dataLength, 0) } else { tweets[i] = lib.GetTweets(archiveQuerys[i], dataLength, 1) } //expand sharedSecret so it is of right length expandBy := len(tweets[i]) / 32 var expandedSharedSecret []byte for i := 0; i < expandBy; i++ { expandedSharedSecret = append(expandedSharedSecret, clientKeys.SharedSecret[:]...) } //Xor's sharedSecret with all tweets lib.Xor(expandedSharedSecret[:], tweets[i]) //receives tweets from follower and Xor's them in tweetsLengthBytes, _ := readFrom(followerConnection, 4, nil, 0) tweetsReceivedLength := byteToInt(tweetsLengthBytes) receivedTweets, _ := readFrom(followerConnection, tweetsReceivedLength, nil, 0) lib.Xor(receivedTweets, tweets[i]) } //sends tweets to client for i := 0; i < tmpNeededSubscriptions; i++ { tweetsLengthBytes := intToByte(len(tweets[i])) errorBool := writeToWError(clientConnection, tweetsLengthBytes, followerConnection, 2) if errorBool { return true } errorBool = writeToWError(clientConnection, tweets[i], followerConnection, 2) if errorBool { return true } } return false } func handlePirQuery(clientKeys clientKeys, clientConnection net.Conn, followerConnection net.Conn, subPhase int, doAuditing bool) (clientKeys, [][]byte, bool) { clientPublicKey := clientKeys.PublicKey //gets the msg length msgLengthBytes, errorBool := readFrom(clientConnection, 4, followerConnection, 5) if errorBool { return clientKeys, nil, true } msgLength := byteToInt(msgLengthBytes) //gets the leader box leaderBox, errorBool := readFrom(clientConnection, msgLength, followerConnection, 5) if errorBool { return clientKeys, nil, true } //gets the follower box followerBox, errorBool := readFrom(clientConnection, msgLength, followerConnection, 5) if errorBool { return clientKeys, nil, true } tmpNeededSubscriptions := neededSubscriptions tmpTopicAmount := topicAmount if subPhase == -1 { archiveNeededSubscriptions, errorBool := readFrom(clientConnection, 4, followerConnection, 5) if errorBool { return clientKeys, nil, true } writeToWError(followerConnection, archiveNeededSubscriptions, nil, 0) tmpNeededSubscriptions = byteToInt(archiveNeededSubscriptions) tmpTopicAmount = archiveTopicAmount } if doAuditing { tmpNeededSubscriptions = 1 tmpTopicAmount = dbWriteSize } //send length to follower writeToWError(followerConnection, msgLengthBytes, nil, 0) //send box to follower writeToWError(followerConnection, followerBox, nil, 0) var decryptNonce [24]byte copy(decryptNonce[:], leaderBox[:24]) decrypted, ok := box.Open(nil, leaderBox[24:], &decryptNonce, clientPublicKey, leaderPrivateKey) if !ok { panic("pirQuery decryption not ok") } //if sharedSecret is send if subPhase == 0 { var tmpSharedSecret [32]byte for index := 0; index < 32; index++ { tmpSharedSecret[index] = decrypted[index] } clientKeys.SharedSecret = tmpSharedSecret decrypted = decrypted[32:] } if doAuditing { result := make([][]byte, 1) result[0] = decrypted return clientKeys, result, false } //transforms byteArray to ints of wanted topics pirQueryFlattened := decrypted pirQuerys := make([][]byte, tmpNeededSubscriptions) for i := range pirQuerys { pirQuerys[i] = make([]byte, tmpTopicAmount) } for i := 0; i < tmpNeededSubscriptions; i++ { pirQuerys[i] = pirQueryFlattened[i*tmpTopicAmount : (i+1)*tmpTopicAmount] } //sets the pirQuery for the client in case whe are not archiving, and not Auditing if subPhase != -1 { clientKeys.PirQuery = pirQuerys } return clientKeys, pirQuerys, false } func transformBytesToStringArray(topicsAsBytes []byte) []string { var topics []string var topic string var position int = 0 for _, letter := range topicsAsBytes { if string(letter) == "," { topics[position] = topic topic = "" position++ } else { topic = topic + string(letter) } } return topics } func byteToInt(myBytes []byte) (x int) { x = int(myBytes[3])<<24 + int(myBytes[2])<<16 + int(myBytes[1])<<8 + int(myBytes[0]) return } //returns true if error occured func sendTopicLists(clientConnection, followerConnection net.Conn, setup bool) bool { for i := 0; i < 2; i++ { var topicList []byte if i == 0 { topicList, topicAmount = lib.GetTopicList(i) } else { topicList, archiveTopicAmount = lib.GetTopicList(i) } topicListLengthBytes := intToByte(len(topicList)) if !setup { err := writeToWError(clientConnection, topicListLengthBytes, followerConnection, 5) if err { return true } err = writeToWError(clientConnection, topicList, followerConnection, 5) if err { return true } } else { _, err := clientConnection.Write(topicListLengthBytes) if err != nil { return true } _, err = clientConnection.Write(topicList) if err != nil { return true } } } return false } //sends the array to the connection func writeTo(connection net.Conn, array []byte) { _, err := connection.Write(array) if err != nil { panic(err) } } func writeToWError(connection net.Conn, array []byte, followerConnection net.Conn, size int) bool { var err error if connection.RemoteAddr().String() == follower { arrayWError := make([]byte, 1) arrayWError = append(arrayWError, array[:]...) _, err = connection.Write(arrayWError) } else { _, err = connection.Write(array) } if err != nil { //lets follower know that client has disconnected unexpectedly if connection.RemoteAddr().String() != follower { fmt.Println(err) array := make([]byte, size) array[0] = 1 _, err = followerConnection.Write(array) if err != nil { panic(err) } return true } else { panic(err) } } return false } //reads an array which is returned and of size "size" from the connection //returns true if error occured func readFrom(connection net.Conn, size int, followerConnection net.Conn, sizeError int) ([]byte, bool) { array := make([]byte, size) _, err := connection.Read(array) if err != nil { //lets follower know that client has disconnected unexpectedly if connection.RemoteAddr().String() != follower { fmt.Println(err) array := make([]byte, sizeError) array[0] = 1 _, err = followerConnection.Write(array) if err != nil { panic(err) } return nil, true } else { panic(err) } } return array, false } func intToByte(myInt int) (retBytes []byte) { retBytes = make([]byte, 4) retBytes[3] = byte((myInt >> 24) & 0xff) retBytes[2] = byte((myInt >> 16) & 0xff) retBytes[1] = byte((myInt >> 8) & 0xff) retBytes[0] = byte(myInt & 0xff) return }