package main /* #cgo CFLAGS: -O2 #cgo LDFLAGS: -lcrypto -lm #include "../c/dpf.h" #include "../c/okvClient.h" #include "../c/dpf.c" #include "../c/okvClient.c" */ import "C" import ( "2PPS/lib" "bufio" "bytes" "crypto/rand" "crypto/sha256" "crypto/tls" "encoding/json" "fmt" "log" "math/big" mr "math/rand" "net" "os" "sort" "strconv" "strings" "sync" "time" "unsafe" "golang.org/x/crypto/nacl/box" ) type tweet struct { Topics []string Text string } const leader string = "127.0.0.1:4441" //needs to be changed at leader/client at the same time const numClients = 1000 //mylimit=8000 //sudo prlimit --nofile=$mylimit --pid $$; ulimit -n $mylimit //for every terminal const dataLength int = 256 //Maximum Transport Unit const mtu int = 1100 var dbWriteSize int var round int var topicList []string var archiveTopicList []string var neededSubscriptions int var publisherAmount int var goodPadding int var blocksReceived int var timeBounds []float64 //this translates to a simulated round length of ~2h var speedUp float64 = 7200 / ((maxTimePerRound.Seconds()) * 3) var maxTimePerRound time.Duration = 1000 * time.Second var startTime int var archiveInterests = make([]int, 1) var sharedSecret [numClients][2][32]byte = createSharedSecret() var wantsArchive = make([]byte, 1) var leaderPublicKey *[32]byte var followerPublicKey *[32]byte var clientPrivateKey [numClients]*[32]byte var clientPublicKey [numClients]*[32]byte func main() { f, err := os.OpenFile("evalDataClient", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) if err != nil { log.Fatalf("error opening file: %v", err) } defer f.Close() log.SetOutput(f) log.Println("simulated Duration", speedUp) wg := &sync.WaitGroup{} getTimeBounds() for i := 0; i < numClients; i++ { wg.Add(1) go client(i, f) time.Sleep(1 * time.Millisecond) } wg.Wait() } func client(clientNumber int, f *os.File) { generatedPublicKey, generatedPrivateKey, err := box.GenerateKey(rand.Reader) if err != nil { panic(err) } clientPrivateKey[clientNumber] = generatedPrivateKey clientPublicKey[clientNumber] = generatedPublicKey C.initializeCipher() //initializes the connection to the leader conf := &tls.Config{ InsecureSkipVerify: true, } leaderConn, err := tls.Dial("tcp", leader, conf) if err != nil { panic(err) } leaderConn.SetDeadline(time.Time{}) //receives topics first so client can participate asap receiveTopicLists(leaderConn) //gets the public keys of both servers var tmpLeaderPubKey [32]byte _, err = leaderConn.Read(tmpLeaderPubKey[:]) if err != nil { panic(err) } leaderPublicKey = &tmpLeaderPubKey var tmpFollowerPubKey [32]byte _, err = leaderConn.Read(tmpFollowerPubKey[:]) if err != nil { panic(err) } followerPublicKey = &tmpFollowerPubKey //sends own public key writeTo(leaderConn, clientPublicKey[clientNumber][:]) neededSubscriptionsBytes := readFrom(leaderConn, 4) neededSubscriptions = byteToInt(neededSubscriptionsBytes) startTimeBytes := readFrom(leaderConn, 4) startTime = byteToInt(startTimeBytes) //setup ends above //while client is active he is always connected and has to participate for { //gets current phase phase := readFrom(leaderConn, 1) if phase[0] == 1 { //gets current dbWriteSize from leader dbWriteSizeBytes := readFrom(leaderConn, 4) dbWriteSize = byteToInt(dbWriteSizeBytes) //roundAsBytes := readFrom(leaderConn, 4) roundAsBytes := make([]byte, 4) _, err = leaderConn.Read(roundAsBytes) if err != nil { panic(err) } round = byteToInt(roundAsBytes) if clientNumber == 0 { fmt.Println("Round ", round) } //request virtualAddress from leader via pirQuery encryptedQueryLeader, encryptedQueryFollower := createAuditPIRQuery(clientNumber) sendQuerys(encryptedQueryLeader, encryptedQueryFollower, leaderConn, false) pos := receiveVirtualAddress(sharedSecret[clientNumber], leaderConn) tweet := getRealTweet(clientNumber) if clientNumber == numClients-1 { log.Println("Round", round) log.Println("publisherAmount", publisherAmount) log.Println("goodPadding", goodPadding) log.Println("blocksReceived", blocksReceived) log.Println("goodPadding Percentage", float64(goodPadding)/float64(blocksReceived)) publisherAmount = 0 } //prep the query dataSize := len(tweet) querySize := make([]byte, 4) cQuerySize := C.int(byteToInt(querySize)) var dpfQueryA *C.uchar var dpfQueryB *C.uchar C.prepQuery(C.int(pos), C.int(dbWriteSize), (*C.uchar)(&tweet[0]), C.int(dataSize), &cQuerySize, &dpfQueryA, &dpfQueryB) intQuerySize := int(cQuerySize) //byteToInt(querySize) //write the query queryAPlaintext := C.GoBytes(unsafe.Pointer(dpfQueryA), C.int(intQuerySize)) //encrypts queryA and appends it to message var nonce [24]byte //fill nonce with randomness _, err = rand.Read(nonce[:]) if err != nil { panic("couldn't get randomness for nonce!") } dpfQueryAEncrypted := box.Seal(nonce[:], queryAPlaintext, &nonce, leaderPublicKey, clientPrivateKey[clientNumber]) //encrypts queryB and appends it to message queryBPlaintext := C.GoBytes(unsafe.Pointer(dpfQueryB), C.int(intQuerySize)) //fill nonce with randomness _, err = rand.Read(nonce[:]) if err != nil { panic("couldn't get randomness for nonce!") } dpfQueryBEncrypted := box.Seal(nonce[:], queryBPlaintext, &nonce, followerPublicKey, clientPrivateKey[clientNumber]) //writes the dpfQuery to the leader dpfLengthBytes := intToByte(len(dpfQueryAEncrypted)) writeTo(leaderConn, dpfLengthBytes) writeTo(leaderConn, dpfQueryAEncrypted) writeTo(leaderConn, dpfQueryBEncrypted) C.free(unsafe.Pointer(dpfQueryA)) C.free(unsafe.Pointer(dpfQueryB)) } else if phase[0] == 3 { /* possible Values 0 : new client leader expects sharedSecrets, expects pirQuery 1 : update needed leader sends topicList, performs local update of sharedSecret, expects pirQuery 2 : no update needed nothing */ subPhase := readFrom(leaderConn, 1) var encryptedQueryLeader, encryptedQueryFollower []byte //first time participating if subPhase[0] == 0 { receiveTopicLists(leaderConn) encryptedQueryLeader, encryptedQueryFollower = createPIRQuery(int(subPhase[0]), clientNumber) sendQuerys(encryptedQueryLeader, encryptedQueryFollower, leaderConn, false) } //updates the topic list and what client is interested in if subPhase[0] == 1 { receiveTopicLists(leaderConn) //updates local secret for index := 0; index < 2; index++ { sharedSecret[clientNumber][index] = sha256.Sum256(sharedSecret[clientNumber][index][:]) } encryptedQueryLeader, encryptedQueryFollower = createPIRQuery(int(subPhase[0]), clientNumber) sendQuerys(encryptedQueryLeader, encryptedQueryFollower, leaderConn, false) } receiveTweets(sharedSecret[clientNumber], leaderConn, false, clientNumber) if len(archiveTopicList) > 0 { wantsArchive[0] = 0 } else { wantsArchive[0] = 0 } writeTo(leaderConn, wantsArchive) if wantsArchive[0] == 1 && len(archiveTopicList) > 0 { encryptedQueryLeader, encryptedQueryFollower = createPIRQuery(-1, clientNumber) sendQuerys(encryptedQueryLeader, encryptedQueryFollower, leaderConn, true) receiveTweets(sharedSecret[clientNumber], leaderConn, true, clientNumber) } } else { fmt.Println("Phase", phase) panic("somethin went wrong") } } } //creates and sends the pirQuerys for each server func createPIRQuery(subPhase int, clientNumber int) ([]byte, []byte) { //later this will be taken from gui, this is only for testing topicsOfInterest := make([]int, 1) topicsOfInterest[0] = mr.Intn(10) archiveInterests[0] = mr.Intn(10) tmpNeededSubscriptions := neededSubscriptions if tmpNeededSubscriptions > len(topicList) { tmpNeededSubscriptions = len(topicList) } tmptopicsOfInterest := make([]int, len(topicsOfInterest)) copy(tmptopicsOfInterest, topicsOfInterest) tmpTopicList := make([]string, len(topicList)) copy(tmpTopicList, topicList) if wantsArchive[0] == 1 && subPhase == -1 { tmpNeededSubscriptions = len(archiveInterests) if tmpNeededSubscriptions > len(archiveTopicList) { tmpNeededSubscriptions = len(archiveTopicList) } tmptopicsOfInterest = archiveInterests //todo! take archiveInterests from gui tmpTopicList = archiveTopicList } //creates fake topicsOfInterest if client is boooring if len(tmptopicsOfInterest) < tmpNeededSubscriptions && subPhase != -1 { tmptopicsOfInterest = addFakeInterests(len(tmpTopicList), tmptopicsOfInterest, false) } //pirQuery [topicsOfInterest][serverAmount][topicAmount]byte pirQuerys := make([][][]byte, len(tmptopicsOfInterest)) for i := range pirQuerys { pirQuerys[i] = make([][]byte, 2) for j := range pirQuerys[i] { pirQuerys[i][j] = make([]byte, len(tmpTopicList)) } } //for leader //pirQuery will be filled with random bits for topic := range tmptopicsOfInterest { for index := range tmpTopicList { bit, err := rand.Int(rand.Reader, big.NewInt(2)) if err != nil { panic(err) } pirQuerys[topic][0][index] = byte(bit.Int64()) } } tmptopicsOfInterestBytes := make([]byte, len(tmpTopicList)) for index := range tmptopicsOfInterest { if tmptopicsOfInterest[index] == 1 { tmptopicsOfInterestBytes[index] = 1 } } for topicIndex, topic := range tmptopicsOfInterest { for index := range tmpTopicList { if topic == index { if pirQuerys[topicIndex][0][index] == 1 { pirQuerys[topicIndex][1][index] = 0 } else { pirQuerys[topicIndex][1][index] = 1 } } else { if pirQuerys[topicIndex][0][index] == 0 { pirQuerys[topicIndex][1][index] = 0 } else { pirQuerys[topicIndex][1][index] = 1 } } } } //flattens the querys to be able to send them more efficently messagesFlattened := make([][]byte, 2) //adds the sharedSecret to the first pirQuery when first time participating if subPhase == 0 { for server := 0; server < 2; server++ { messagesFlattened[server] = append(messagesFlattened[server], sharedSecret[clientNumber][server][:]...) } } for server := range messagesFlattened { for topic := range pirQuerys { messagesFlattened[server] = append(messagesFlattened[server], pirQuerys[topic][server]...) } } var nonce [24]byte _, err := rand.Read(nonce[:]) if err != nil { panic("couldn't get randomness for nonce!") } encryptedQueryLeader := box.Seal(nonce[:], messagesFlattened[0], &nonce, leaderPublicKey, clientPrivateKey[clientNumber]) _, err = rand.Read(nonce[:]) if err != nil { panic("couldn't get randomness for nonce!") } encryptedQueryFollower := box.Seal(nonce[:], messagesFlattened[1], &nonce, followerPublicKey, clientPrivateKey[clientNumber]) return encryptedQueryLeader, encryptedQueryFollower } func sendQuerys(encryptedQueryLeader, encryptedQueryFollower []byte, leaderConn net.Conn, getArchive bool) { encryptedLength := len(encryptedQueryLeader) //sends the pirQuerysLength to the leader writeTo(leaderConn, intToByte(encryptedLength)) //sends the pirQuerys to the leader writeTo(leaderConn, encryptedQueryLeader) writeTo(leaderConn, encryptedQueryFollower) if getArchive { writeTo(leaderConn, intToByte(len(archiveInterests))) } } func receiveVirtualAddress(sharedSecret [2][32]byte, leaderConn net.Conn) int { virtualAddressByte := readFrom(leaderConn, 4) //xores the sharedSecret for h := 0; h < 2; h++ { for i := 0; i < 4; i++ { virtualAddressByte[i] = virtualAddressByte[i] ^ sharedSecret[h][i] } } return byteToInt(virtualAddressByte) } func receiveTweets(sharedSecret [2][32]byte, leaderConn net.Conn, getArchive bool, clientNumber int) { tmpNeededSubscriptions := neededSubscriptions if tmpNeededSubscriptions > len(topicList) { tmpNeededSubscriptions = len(topicList) } if getArchive { tmpNeededSubscriptions = len(archiveInterests) if tmpNeededSubscriptions > len(archiveTopicList) { tmpNeededSubscriptions = len(archiveTopicList) } } for i := 0; i < tmpNeededSubscriptions; i++ { if !getArchive { blocksReceived++ } //client receives tweets tweetsLengthBytes := readFrom(leaderConn, 4) tweetsLength := byteToInt(tweetsLengthBytes) tweets := readFrom(leaderConn, tweetsLength) //expand sharedSecret so it is of right length expandBy := len(tweets) / 32 expandedSharedSecrets := make([][]byte, 2) for i := 0; i < 2; i++ { for j := 0; j < expandBy; j++ { expandedSharedSecrets[i] = append(expandedSharedSecrets[i], sharedSecret[i][:]...) } } //xors the received messge into the message to display for i := 0; i < 2; i++ { lib.Xor(expandedSharedSecrets[i][:], tweets) } index := strings.Index(string(tweets), ";;") if index != -1 { textArr := strings.Split(string(tweets), ";;;") text := textArr[:len(textArr)-1] if text[1] != "" { text[1] = text[1][1:] } ok := strings.Contains(text[0], text[1]) if !ok { goodPadding++ } } else if index == -1 && tweets[0] != 0 { fmt.Println("error") fmt.Println("round", round, string(tweets), "length", len(tweets)) return } } } //creates a shared secret for each server func createSharedSecret() [numClients][2][32]byte { var tmpSharedSecret [numClients][2][32]byte for i := 0; i < numClients; i++ { for j := 0; j < 2; j++ { _, err := rand.Read(tmpSharedSecret[i][j][:]) if err != nil { panic("couldn't get randomness for sharedSecret!") } } } return tmpSharedSecret } func createAuditPIRQuery(clientNumber int) ([]byte, []byte) { //pirQuery [serverAmount][dbWriteSize]byte pirQuerys := make([][]byte, 2) for i := range pirQuerys { pirQuerys[i] = make([]byte, dbWriteSize) } //for leader //pirQuery will be filled with random bits for index := range pirQuerys[0] { bit := mr.Intn(2) pirQuerys[0][index] = byte(bit) } copy(pirQuerys[1], pirQuerys[0]) //the positon the virtual address will be taken from pos := mr.Intn(dbWriteSize) pirQuerys[0][pos] = 1 pirQuerys[1][pos] = 0 //flattens the querys to be able to send them more efficently messagesFlattened := make([][]byte, 2) //adds the sharedSecret to the pirQuery for server := 0; server < 2; server++ { messagesFlattened[server] = append(messagesFlattened[server], sharedSecret[clientNumber][server][:]...) } for server := 0; server < 2; server++ { messagesFlattened[server] = append(messagesFlattened[server], pirQuerys[server][:]...) } var nonce [24]byte _, err := rand.Read(nonce[:]) if err != nil { panic("couldn't get randomness for nonce!") } encryptedQueryLeader := box.Seal(nonce[:], messagesFlattened[0], &nonce, leaderPublicKey, clientPrivateKey[clientNumber]) _, err = rand.Read(nonce[:]) if err != nil { panic("couldn't get randomness for nonce!") } encryptedQueryFollower := box.Seal(nonce[:], messagesFlattened[1], &nonce, followerPublicKey, clientPrivateKey[clientNumber]) return encryptedQueryLeader, encryptedQueryFollower } //generates a topicOfInterest array with random values func addFakeInterests(max int, topicsOfInterest []int, doAuditing bool) []int { tmpNeededSubscriptions := neededSubscriptions if tmpNeededSubscriptions > len(topicList) { tmpNeededSubscriptions = len(topicList) } faketopicsOfInterest := make([]int, tmpNeededSubscriptions) maxInt := max //fills the array with unique random ascending values ranging from 0 to max for i := 0; i < tmpNeededSubscriptions; i++ { faketopicsOfInterest[i] = mr.Intn(maxInt) for j := 0; j < i; j++ { if faketopicsOfInterest[i] == faketopicsOfInterest[j] { i-- break } } } if doAuditing { sort.Ints(faketopicsOfInterest) return faketopicsOfInterest } //adds unique and new random numbers to topicOfInterests until length is satisfied for _, number := range faketopicsOfInterest { if !inList(number, topicsOfInterest) { topicsOfInterest = append(topicsOfInterest, number) } if len(topicsOfInterest) == tmpNeededSubscriptions { break } } sort.Ints(topicsOfInterest) return topicsOfInterest } func inList(number int, list []int) bool { for _, element := range list { if element == number { return true } } return false } func receiveTopicLists(leaderConn net.Conn) { for i := 0; i < 2; i++ { topicListLength := readFrom(leaderConn, 4) recTopicList := readFrom(leaderConn, byteToInt(topicListLength)) var tmpTopicList []string arrayReader := bytes.NewReader(recTopicList[:]) json.NewDecoder(arrayReader).Decode(&tmpTopicList) if i == 0 { topicList = tmpTopicList } else { archiveTopicList = tmpTopicList } } } func getRealTweet(clientNumber int) []byte { fUserList, err := os.Open("/home/simon/goCode/tweets/userList") if err != nil { panic(err) } defer fUserList.Close() currentLine := 0 scanner := bufio.NewScanner(fUserList) userID := "" for scanner.Scan() { if currentLine == clientNumber { userID = scanner.Text() break } currentLine++ } if userID == "" { panic("no userID picked") } fTweets, err := os.Open("/home/simon/goCode/tweets/userTweets/" + userID) if err != nil { panic(err) } defer fTweets.Close() scanner = bufio.NewScanner(fTweets) lowerBound := timeBounds[round-1] upperBound := timeBounds[round] var tweet []byte for scanner.Scan() { //skips round 1, cause of 90% publisher rate if round == 1 { break } lineArr := strings.Split(scanner.Text(), ", \"hashtags\"") lineArr = strings.Split(lineArr[0], ": ") lineArr = strings.Split(lineArr[1], " \"") timestamp, _ := strconv.Atoi(lineArr[0]) //transforms timestamp to current time timestamp -= 1351742400 timestamp += startTime if float64(timestamp) > lowerBound && float64(timestamp) < upperBound { lineArr = strings.Split(scanner.Text(), "[\"") line := lineArr[1] lineArr = strings.Split(line, "\"]") line = lineArr[0] lineArr = strings.Split(line, ",") line = strings.Join(lineArr, "") topicLine := strings.Split(line, "\"") var topics []byte for index, topic := range topicLine { if index%2 == 1 { continue } if len(topics)+len(topic) > dataLength-10 { break } topics = append(topics, []byte(topic)[:]...) topics = append(topics, []byte(",")[0]) } if len(topics) == 0 { break } topics = topics[:len(topics)-1] tweet = append(tweet, topics...) tweet = append(tweet, []byte(";")[0]) r := mr.New(mr.NewSource(time.Now().UnixNano())) num := r.Intn(10000) if num == 0 { num = 1 } tweet = append(tweet, []byte(strconv.Itoa(num) + ";;")[:]...) //adds padding length := dataLength - len(tweet) padding := make([]byte, length) rand.Read(padding) tweet = append(tweet, padding...) publisherAmount++ return tweet } } tweet = make([]byte, dataLength) return tweet } func getTimeBounds() { timeBounds = make([]float64, 10000) timeBounds[0] = float64(time.Now().Unix()) for index := range timeBounds { if index == 0 { continue } timeBounds[index] = timeBounds[index-1] + speedUp*(3*maxTimePerRound.Seconds()+2) } } func getRandomTweet(clientNumber int) []byte { var tweet []byte r := mr.New(mr.NewSource(time.Now().UnixNano())) maxTopics := r.Intn(6) if maxTopics == 0 { maxTopics = 1 } maxInt := 100 topicNumbers := make([]int, maxTopics) //fills the array with unique random ascending values ranging from 0 to maxInt for i := 0; i < maxTopics; i++ { topicNumbers[i] = mr.Intn(maxInt) for j := 0; j < i; j++ { if topicNumbers[i] == topicNumbers[j] { i-- break } } } sort.Ints(topicNumbers) var topics []byte topicIndex := 0 for i := 0; i < len(topicNumbers)*2; i++ { if i%2 == 0 { topics = append(topics, byte(topicNumbers[topicIndex])) topicIndex++ } else if i != (len(topicNumbers)*2)-1 { topics = append(topics, []byte(",")[0]) } } topics = append(topics, []byte(";")[0]) num := r.Intn(100) if num == 0 { num = 1 } text := []byte(strconv.Itoa(num) + ";") tweet = append(tweet, topics...) tweet = append(tweet, text...) tweet = append(tweet, []byte(";")[0]) //adds padding length := dataLength - len(tweet) padding := make([]byte, length) rand.Read(padding) tweet = append(tweet, padding...) return tweet } //sends the array to the connection func writeTo(connection net.Conn, array []byte) { remainingLength := len(array) for remainingLength > 0 { if remainingLength >= mtu { _, err := connection.Write(array[:mtu]) if err != nil { panic(err) } array = array[mtu:] remainingLength -= mtu } else { _, err := connection.Write(array) if err != nil { panic(err) } remainingLength = 0 } } } //reads an array which is returned and of size "size" from the connection func readFrom(connection net.Conn, size int) []byte { var array []byte remainingSize := size for remainingSize > 0 { var err error toAppend := make([]byte, mtu) if remainingSize > mtu { _, err = connection.Read(toAppend) array = append(array, toAppend...) remainingSize -= mtu } else { _, err = connection.Read(toAppend[:remainingSize]) array = append(array, toAppend[:remainingSize]...) remainingSize = 0 } if err != nil { panic(err) } } return array } func intToByte(myInt int) (retBytes []byte) { retBytes = make([]byte, 4) retBytes[3] = byte((myInt >> 24) & 0xff) retBytes[2] = byte((myInt >> 16) & 0xff) retBytes[1] = byte((myInt >> 8) & 0xff) retBytes[0] = byte(myInt & 0xff) return } func byteToInt(myBytes []byte) (x int) { x = int(myBytes[3])<<24 + int(myBytes[2])<<16 + int(myBytes[1])<<8 + int(myBytes[0]) return }