package main /* #cgo CFLAGS: -O2 #cgo LDFLAGS: -lcrypto -lm #include "../c/dpf.h" #include "../c/okvClient.h" #include "../c/dpf.c" #include "../c/okvClient.c" */ import "C" //sssssssssssss import ( "2PPS/lib" "bufio" "bytes" "crypto/rand" "crypto/sha256" "crypto/tls" "encoding/json" "fmt" "math/big" mr "math/rand" "net" "os" "sort" "strconv" "strings" "sync" "time" "unsafe" "golang.org/x/crypto/nacl/box" ) type tweet struct { Topics []string Text string } const leader string = "127.0.0.1:4441" //needs to be changed at leader/follower/client at the same time const numClients = 500 //mylimit=8000 //sudo prlimit --nofile=$mylimit --pid $$; ulimit -n $mylimit const dataLength int = 256 const numThreads int = 12 //Maximum Transport Unit const mtu int = 1100 var dbWriteSize int var round int var topicList []string var archiveTopicList []string var neededSubscriptions int var publisherAmount int var timeBounds []int var speedUp int = 50 var maxTimePerRound time.Duration = 25 * time.Second var startTime int //todo! expand this for multiple clients var archiveInterests = make([]int, 1) var sharedSecret [numClients][2][32]byte = createSharedSecret() var wantsArchive = make([]byte, 1) var leaderPublicKey *[32]byte var followerPublicKey *[32]byte var clientPrivateKey [numClients]*[32]byte var clientPublicKey [numClients]*[32]byte func main() { wg := &sync.WaitGroup{} getTimeBounds() for i := 0; i < numClients; i++ { wg.Add(1) go client(i) time.Sleep(1 * time.Millisecond) } wg.Wait() } func client(clientNumber int) { generatedPublicKey, generatedPrivateKey, err := box.GenerateKey(rand.Reader) if err != nil { panic(err) } clientPrivateKey[clientNumber] = generatedPrivateKey clientPublicKey[clientNumber] = generatedPublicKey C.initializeCipher() //initializes the connection to the leader conf := &tls.Config{ InsecureSkipVerify: true, } leaderConn, err := tls.Dial("tcp", leader, conf) if err != nil { fmt.Println("clientNumber", clientNumber) panic(err) } leaderConn.SetDeadline(time.Time{}) //receives topics first so client can participate asap receiveTopicLists(leaderConn) //gets the public keys of both servers var tmpLeaderPubKey [32]byte _, err = leaderConn.Read(tmpLeaderPubKey[:]) if err != nil { panic(err) } leaderPublicKey = &tmpLeaderPubKey var tmpFollowerPubKey [32]byte _, err = leaderConn.Read(tmpFollowerPubKey[:]) if err != nil { panic(err) } followerPublicKey = &tmpFollowerPubKey //sends own public key writeTo(leaderConn, clientPublicKey[clientNumber][:]) neededSubscriptionsBytes := readFrom(leaderConn, 4) neededSubscriptions = byteToInt(neededSubscriptionsBytes) startTimeBytes := readFrom(leaderConn, 4) startTime = byteToInt(startTimeBytes) //setup ends above //while client is active he is always connected and has to participate for { //gets current phase phase := readFrom(leaderConn, 1) if phase[0] == 1 { //gets current dbWriteSize from leader dbWriteSizeBytes := readFrom(leaderConn, 4) dbWriteSize = byteToInt(dbWriteSizeBytes) //todo! put into tweet creation //roundAsBytes := readFrom(leaderConn, 4) roundAsBytes := make([]byte, 4) _, err = leaderConn.Read(roundAsBytes) if err != nil { panic(err) } round = byteToInt(roundAsBytes) if clientNumber == 0 { fmt.Println("Round ", round) } //request virtualAddress from leader via pirQuery encryptedQueryLeader, encryptedQueryFollower := createAuditPIRQuery(clientNumber) sendQuerys(encryptedQueryLeader, encryptedQueryFollower, leaderConn, false) pos := receiveVirtualAddress(sharedSecret[clientNumber], leaderConn) tweet := getRealTweet(clientNumber) if clientNumber == numClients-1 { fmt.Println("publisherAmount", publisherAmount) } //prep the query dataSize := len(tweet) querySize := make([]byte, 4) cQuerySize := C.int(byteToInt(querySize)) var dpfQueryA *C.uchar var dpfQueryB *C.uchar C.prepQuery(C.int(pos), C.int(dbWriteSize), (*C.uchar)(&tweet[0]), C.int(dataSize), &cQuerySize, &dpfQueryA, &dpfQueryB) intQuerySize := int(cQuerySize) //byteToInt(querySize) //write the query queryAPlaintext := C.GoBytes(unsafe.Pointer(dpfQueryA), C.int(intQuerySize)) //encrypts queryA and appends it to message var nonce [24]byte //fill nonce with randomness _, err = rand.Read(nonce[:]) if err != nil { panic("couldn't get randomness for nonce!") } dpfQueryAEncrypted := box.Seal(nonce[:], queryAPlaintext, &nonce, leaderPublicKey, clientPrivateKey[clientNumber]) //encrypts queryB and appends it to message queryBPlaintext := C.GoBytes(unsafe.Pointer(dpfQueryB), C.int(intQuerySize)) //fill nonce with randomness _, err = rand.Read(nonce[:]) if err != nil { panic("couldn't get randomness for nonce!") } dpfQueryBEncrypted := box.Seal(nonce[:], queryBPlaintext, &nonce, followerPublicKey, clientPrivateKey[clientNumber]) //writes the dpfQuery to the leader dpfLengthBytes := intToByte(len(dpfQueryAEncrypted)) writeTo(leaderConn, dpfLengthBytes) writeTo(leaderConn, dpfQueryAEncrypted) writeTo(leaderConn, dpfQueryBEncrypted) C.free(unsafe.Pointer(dpfQueryA)) C.free(unsafe.Pointer(dpfQueryB)) } else if phase[0] == 3 { /* possible Values 0 : new client leader expects sharedSecrets, expects pirQuery 1 : update needed leader sends topicList, performs local update of sharedSecret, expects pirQuery 2 : no update needed nothing */ subPhase := readFrom(leaderConn, 1) var encryptedQueryLeader, encryptedQueryFollower []byte //first time participating if subPhase[0] == 0 { receiveTopicLists(leaderConn) encryptedQueryLeader, encryptedQueryFollower = createPIRQuery(int(subPhase[0]), clientNumber) sendQuerys(encryptedQueryLeader, encryptedQueryFollower, leaderConn, false) } //updates the topic list and what client is interested in if subPhase[0] == 1 { receiveTopicLists(leaderConn) //updates local secret for index := 0; index < 2; index++ { sharedSecret[clientNumber][index] = sha256.Sum256(sharedSecret[clientNumber][index][:]) } encryptedQueryLeader, encryptedQueryFollower = createPIRQuery(int(subPhase[0]), clientNumber) sendQuerys(encryptedQueryLeader, encryptedQueryFollower, leaderConn, false) } receiveTweets(sharedSecret[clientNumber], leaderConn, false, clientNumber) if len(archiveTopicList) > 0 { wantsArchive[0] = 0 //archive test } else { wantsArchive[0] = 0 } writeTo(leaderConn, wantsArchive) if wantsArchive[0] == 1 && len(archiveTopicList) > 0 { encryptedQueryLeader, encryptedQueryFollower = createPIRQuery(-1, clientNumber) sendQuerys(encryptedQueryLeader, encryptedQueryFollower, leaderConn, true) receiveTweets(sharedSecret[clientNumber], leaderConn, true, clientNumber) } } else { fmt.Println("Phase", phase) panic("somethin went wrong") } } } //creates and sends the pirQuerys for each server func createPIRQuery(subPhase int, clientNumber int) ([]byte, []byte) { //later this will be taken from gui, this is only for testing topicsOFInterest := make([]int, 10) topicsOFInterest[0] = 0 topicsOFInterest[1] = 1 topicsOFInterest[9] = 1 archiveInterests[0] = 1 //todo! repeat for archive tmpNeededSubscriptions := neededSubscriptions if tmpNeededSubscriptions > len(topicList) { tmpNeededSubscriptions = len(topicList) } tmptopicsOfInterest := make([]int, len(topicList)) copy(tmptopicsOfInterest, topicsOFInterest) tmpTopicList := make([]string, len(topicList)) copy(tmpTopicList, topicList) if wantsArchive[0] == 1 && subPhase == -1 { tmpNeededSubscriptions = len(archiveInterests) if tmpNeededSubscriptions > len(archiveTopicList) { tmpNeededSubscriptions = len(archiveTopicList) } copy(tmptopicsOfInterest, archiveInterests) //archiveInterests from gui copy(tmpTopicList, archiveTopicList) } //creates fake topicsOfInterest if client is boooring if len(tmptopicsOfInterest) < tmpNeededSubscriptions && subPhase != -1 { tmptopicsOfInterest = addFakeInterests(len(tmpTopicList), tmptopicsOfInterest, false) } //pirQuery [topicsofinterest][serverAmount][topicAmount]byte pirQuerys := make([][][]byte, len(tmptopicsOfInterest)) for i := range pirQuerys { pirQuerys[i] = make([][]byte, 2) for j := range pirQuerys[i] { pirQuerys[i][j] = make([]byte, len(tmpTopicList)) } } //for leader //pirQuery will be filled with random bits for topic := range tmptopicsOfInterest { for index := range tmpTopicList { bit, err := rand.Int(rand.Reader, big.NewInt(2)) if err != nil { panic(err) } pirQuerys[topic][0][index] = byte(bit.Int64()) } } tmptopicsOfInterestBytes := make([]byte, tmpNeededSubscriptions) for index := range tmptopicsOfInterestBytes { if tmptopicsOfInterest[index] == 1 { tmptopicsOfInterestBytes[index] = 1 } } for topic := range tmptopicsOfInterest { for index := range tmpTopicList { if topic == index { if pirQuerys[topic][0][index] == 1 { pirQuerys[topic][1][index] = 0 } else { pirQuerys[topic][1][index] = 1 } } else { if pirQuerys[topic][0][index] == 0 { pirQuerys[topic][1][index] = 0 } else { pirQuerys[topic][1][index] = 1 } } } } //flattens the querys to be able to send them more efficently messagesFlattened := make([][]byte, 2) //adds the sharedSecret to the first pirQuery when first time participating if subPhase == 0 { for server := 0; server < 2; server++ { messagesFlattened[server] = append(messagesFlattened[server], sharedSecret[clientNumber][server][:]...) } } for server := range messagesFlattened { for topic := range pirQuerys { messagesFlattened[server] = append(messagesFlattened[server], pirQuerys[topic][server]...) } } var nonce [24]byte _, err := rand.Read(nonce[:]) if err != nil { panic("couldn't get randomness for nonce!") } encryptedQueryLeader := box.Seal(nonce[:], messagesFlattened[0], &nonce, leaderPublicKey, clientPrivateKey[clientNumber]) _, err = rand.Read(nonce[:]) if err != nil { panic("couldn't get randomness for nonce!") } encryptedQueryFollower := box.Seal(nonce[:], messagesFlattened[1], &nonce, followerPublicKey, clientPrivateKey[clientNumber]) return encryptedQueryLeader, encryptedQueryFollower } func sendQuerys(encryptedQueryLeader, encryptedQueryFollower []byte, leaderConn net.Conn, getArchive bool) { encryptedLength := len(encryptedQueryLeader) //sends the pirQuerysLength to the leader writeTo(leaderConn, intToByte(encryptedLength)) //sends the pirQuerys to the leader writeTo(leaderConn, encryptedQueryLeader) writeTo(leaderConn, encryptedQueryFollower) if getArchive { writeTo(leaderConn, intToByte(len(archiveInterests))) } } func receiveVirtualAddress(sharedSecret [2][32]byte, leaderConn net.Conn) int { virtualAddressByte := readFrom(leaderConn, 4) //xores the sharedSecret for h := 0; h < 2; h++ { for i := 0; i < 4; i++ { virtualAddressByte[i] = virtualAddressByte[i] ^ sharedSecret[h][i] } } return byteToInt(virtualAddressByte) } func receiveTweets(sharedSecret [2][32]byte, leaderConn net.Conn, getArchive bool, clientNumber int) { tmpNeededSubscriptions := neededSubscriptions if tmpNeededSubscriptions > len(topicList) { tmpNeededSubscriptions = len(topicList) } if getArchive { tmpNeededSubscriptions = len(archiveInterests) if tmpNeededSubscriptions > len(archiveTopicList) { tmpNeededSubscriptions = len(archiveTopicList) } } for i := 0; i < tmpNeededSubscriptions; i++ { //client receives tweets tweetsLengthBytes := readFrom(leaderConn, 4) tweetsLength := byteToInt(tweetsLengthBytes) tweets := readFrom(leaderConn, tweetsLength) //fmt.Println(tweets[:10]) //expand sharedSecret so it is of right length expandBy := len(tweets) / 32 expandedSharedSecrets := make([][]byte, 2) for i := 0; i < 2; i++ { for j := 0; j < expandBy; j++ { expandedSharedSecrets[i] = append(expandedSharedSecrets[i], sharedSecret[i][:]...) } } //xors the received messge into the message to display for i := 0; i < 2; i++ { lib.Xor(expandedSharedSecrets[i][:], tweets) } //fmt.Println("PubKey", clientPublicKey[clientNumber], "Bytes", tweets) index := strings.Index(string(tweets), ";;") if index != -1 && clientNumber == 0 { /* fmt.Println("Correct") textArr := strings.Split(string(tweets), ";;;;;;;;") text := string(tweets)[:len(textArr)-1] fmt.Println("Round", round, "Text", text[:5], "Length", len(tweets)) */ } else if index == -1 { fmt.Println("error") fmt.Println("round", round, "text:", string(tweets), "length", len(tweets)) return //panic("received text not of correct format") } } } //creates a shared secret for each server func createSharedSecret() [numClients][2][32]byte { var tmpSharedSecret [numClients][2][32]byte for i := 0; i < numClients; i++ { for j := 0; j < 2; j++ { _, err := rand.Read(tmpSharedSecret[i][j][:]) if err != nil { panic("couldn't get randomness for sharedSecret!") } } } return tmpSharedSecret } func createAuditPIRQuery(clientNumber int) ([]byte, []byte) { //pirQuery [serverAmount][dbWriteSize]byte pirQuerys := make([][]byte, 2) for i := range pirQuerys { pirQuerys[i] = make([]byte, dbWriteSize) } //for leader //pirQuery will be filled with random bits for index := range pirQuerys[0] { bit := mr.Intn(2) pirQuerys[0][index] = byte(bit) } copy(pirQuerys[1], pirQuerys[0]) //the positon the virtual address will be taken from pos := mr.Intn(dbWriteSize) pirQuerys[0][pos] = 1 pirQuerys[1][pos] = 0 //flattens the querys to be able to send them more efficently messagesFlattened := make([][]byte, 2) //adds the sharedSecret to the pirQuery for server := 0; server < 2; server++ { messagesFlattened[server] = append(messagesFlattened[server], sharedSecret[clientNumber][server][:]...) } for server := 0; server < 2; server++ { messagesFlattened[server] = append(messagesFlattened[server], pirQuerys[server][:]...) } var nonce [24]byte _, err := rand.Read(nonce[:]) if err != nil { panic("couldn't get randomness for nonce!") } encryptedQueryLeader := box.Seal(nonce[:], messagesFlattened[0], &nonce, leaderPublicKey, clientPrivateKey[clientNumber]) _, err = rand.Read(nonce[:]) if err != nil { panic("couldn't get randomness for nonce!") } encryptedQueryFollower := box.Seal(nonce[:], messagesFlattened[1], &nonce, followerPublicKey, clientPrivateKey[clientNumber]) return encryptedQueryLeader, encryptedQueryFollower } //generates a topicOfInterest array with random values func addFakeInterests(max int, topicsOfInterest []int, doAuditing bool) []int { tmpNeededSubscriptions := neededSubscriptions if tmpNeededSubscriptions > len(topicList) { tmpNeededSubscriptions = len(topicList) } fakeTopicsOfInterest := make([]int, tmpNeededSubscriptions) maxInt := max //fills the array with unique random ascending values ranging from 0 to max for i := 0; i < tmpNeededSubscriptions; i++ { fakeTopicsOfInterest[i] = mr.Intn(maxInt) for j := 0; j < i; j++ { if fakeTopicsOfInterest[i] == fakeTopicsOfInterest[j] { i-- break } } } if doAuditing { sort.Ints(fakeTopicsOfInterest) return fakeTopicsOfInterest } //adds unique and new random numbers to topicOfInterests until length is satisfied for _, number := range fakeTopicsOfInterest { if !inList(number, topicsOfInterest) { topicsOfInterest = append(topicsOfInterest, number) } if len(topicsOfInterest) == tmpNeededSubscriptions { break } } sort.Ints(topicsOfInterest) return topicsOfInterest } func inList(number int, list []int) bool { for _, element := range list { if element == number { return true } } return false } func receiveTopicLists(leaderConn net.Conn) { for i := 0; i < 2; i++ { topicListLength := readFrom(leaderConn, 4) recTopicList := readFrom(leaderConn, byteToInt(topicListLength)) var tmpTopicList []string arrayReader := bytes.NewReader(recTopicList[:]) json.NewDecoder(arrayReader).Decode(&tmpTopicList) if i == 0 { topicList = tmpTopicList } else { archiveTopicList = tmpTopicList } } } func getRealTweet(clientNumber int) []byte { fUserList, err := os.Open("/home/simon/goCode/tweets/userList") if err != nil { panic(err) } defer fUserList.Close() currentLine := 0 scanner := bufio.NewScanner(fUserList) userID := "" for scanner.Scan() { if currentLine == clientNumber { userID = scanner.Text() break } currentLine++ } if userID == "" { panic("no userID picked") } fTweets, err := os.Open("/home/simon/goCode/tweets/userTweets/" + userID) if err != nil { panic(err) } defer fTweets.Close() scanner = bufio.NewScanner(fTweets) lowerBound := timeBounds[round-1] upperBound := timeBounds[round] var tweet []byte for scanner.Scan() { lineArr := strings.Split(scanner.Text(), ", \"hashtags\"") lineArr = strings.Split(lineArr[0], ": ") lineArr = strings.Split(lineArr[1], " \"") timestamp, _ := strconv.Atoi(lineArr[0]) //transforms timestamp to current time timestamp -= 1351742400 timestamp += startTime if timestamp > lowerBound && timestamp < upperBound { lineArr = strings.Split(scanner.Text(), "[\"") line := lineArr[1] lineArr = strings.Split(line, "\"]") line = lineArr[0] lineArr = strings.Split(line, ",") line = strings.Join(lineArr, "") topicLine := strings.Split(line, "\"") var topics []byte for index, topic := range topicLine { if index%2 == 1 { continue } if len(topics)+len(topic) > dataLength-10 { break } topics = append(topics, []byte(topic)[:]...) topics = append(topics, []byte(",")[0]) } topics = topics[:len(topics)-1] //fmt.Println(string(topics)) tweet = append(tweet, topics...) tweet = append(tweet, []byte(";")[0]) r := mr.New(mr.NewSource(time.Now().UnixNano())) num := r.Intn(10000) if num == 0 { num = 1 } tweet = append(tweet, []byte(strconv.Itoa(num) + ";;")[:]...) //fmt.Println("tweet", string(tweet)) //adds padding length := dataLength - len(tweet) padding := make([]byte, length) rand.Read(padding) tweet = append(tweet, padding...) publisherAmount++ return tweet } } tweet = make([]byte, dataLength) return tweet } func getTimeBounds() { timeBounds = make([]int, 10000) timeBounds[0] = int(time.Now().Unix()) for index := range timeBounds { if index == 0 { continue } timeBounds[index] = timeBounds[index-1] + speedUp*(int(3*maxTimePerRound.Seconds())+2) } } func getRandomTweet(clientNumber int) []byte { var tweet []byte r := mr.New(mr.NewSource(time.Now().UnixNano())) maxTopics := r.Intn(6) if maxTopics == 0 { maxTopics = 1 } maxInt := 10000 topicNumbers := make([]int, maxTopics) //fills the array with unique random ascending values ranging from 0 to maxInt for i := 0; i < maxTopics; i++ { topicNumbers[i] = mr.Intn(maxInt) for j := 0; j < i; j++ { if topicNumbers[i] == topicNumbers[j] { i-- break } } } sort.Ints(topicNumbers) //fmt.Println("topicNumbers", topicNumbers) var topics []byte topicIndex := 0 for i := 0; i < len(topicNumbers)*2; i++ { if i%2 == 0 { topics = append(topics, byte(topicNumbers[topicIndex])) topicIndex++ } else if i != (len(topicNumbers)*2)-1 { topics = append(topics, []byte(",")[0]) } } topics = append(topics, []byte(";")[0]) num := r.Intn(100) if num == 0 { num = 1 } text := []byte(strconv.Itoa(num) + ";") tweet = append(tweet, topics...) tweet = append(tweet, text...) tweet = append(tweet, []byte(";")[0]) //fmt.Println("writing", string(text)) //fmt.Println(topicNumbers) if len(tweet) > 32 { fmt.Println("lenlen", len(tweet)) } //adds padding length := dataLength - len(tweet) padding := make([]byte, length) rand.Read(padding) tweet = append(tweet, padding...) return tweet } //sends the array to the connection func writeTo(connection net.Conn, array []byte) { remainingLength := len(array) for remainingLength > 0 { if remainingLength >= mtu { _, err := connection.Write(array[:mtu]) if err != nil { panic(err) } array = array[mtu:] remainingLength -= mtu } else { _, err := connection.Write(array) if err != nil { panic(err) } remainingLength = 0 } } } //reads an array which is returned and of size "size" from the connection func readFrom(connection net.Conn, size int) []byte { var array []byte remainingSize := size for remainingSize > 0 { var err error toAppend := make([]byte, mtu) if remainingSize > mtu { _, err = connection.Read(toAppend) array = append(array, toAppend...) remainingSize -= mtu } else { _, err = connection.Read(toAppend[:remainingSize]) array = append(array, toAppend[:remainingSize]...) remainingSize = 0 } if err != nil { panic(err) } } return array } func intToByte(myInt int) (retBytes []byte) { retBytes = make([]byte, 4) retBytes[3] = byte((myInt >> 24) & 0xff) retBytes[2] = byte((myInt >> 16) & 0xff) retBytes[1] = byte((myInt >> 8) & 0xff) retBytes[0] = byte(myInt & 0xff) return } func byteToInt(myBytes []byte) (x int) { x = int(myBytes[3])<<24 + int(myBytes[2])<<16 + int(myBytes[1])<<8 + int(myBytes[0]) return }