Browse Source

various bugfixes

Simon 2 years ago
parent
commit
6f8d1caed1
5 changed files with 129 additions and 115 deletions
  1. 2 2
      c/okvClient.c
  2. 56 59
      client/client.go
  3. 20 26
      follower/follower.go
  4. 31 27
      leader/leader.go
  5. 20 1
      lib/databaseRead.go

+ 2 - 2
c/okvClient.c

@@ -17,7 +17,7 @@ void initializeCipher() {
 
     if(!(ctx = EVP_CIPHER_CTX_new())) 
         printf("errors occured in creating context\n");
-    //todo! use random key, this is only for testing
+    //should use random key for real world application, this is only for testing
     unsigned char *aeskey = (unsigned char*) "0123456789123456";
     if(1 != EVP_EncryptInit_ex(ctx, EVP_aes_128_ecb(), NULL, aeskey, NULL))
         printf("errors occured in init\n");
@@ -26,7 +26,7 @@ void initializeCipher() {
 
 uint128_t prepQuery(int pos, int dbSize, uint8_t *dataToWrite, int dataSize, int *querySize, uint8_t **dpfQueryA, uint8_t **dpfQueryB){
 
-    position = pos; //rand() % dbSize; //todo! adjust this for different clients, rand gives 83
+    position = pos;
     *querySize = 1 + 16 + 1 + 18 * 128 + dataSize;
     
     genDPF(ctx, 128, position, dataSize, dataToWrite, dpfQueryA, dpfQueryB);

+ 56 - 59
client/client.go

@@ -10,7 +10,7 @@ package main
 */
 import "C"
 
-//sssssssssss
+//sssssssssssss
 import (
 	lib "2PPS/lib"
 	"bytes"
@@ -23,6 +23,7 @@ import (
 	mr "math/rand"
 	"net"
 	"sort"
+	"strconv"
 	"strings"
 	"sync"
 	"time"
@@ -40,7 +41,11 @@ const leader string = "127.0.0.1:4441"
 
 //needs to be changed at leader/follower/client at the same time
 const neededSubscriptions = 1
-const numClients = 1
+const numClients = 2
+const dataLength int = 64
+const numThreads int = 12
+
+var dbWriteSize int = 4
 
 var topicList []string
 var archiveTopicList []string
@@ -51,66 +56,22 @@ var sharedSecret [numClients][2][32]byte = createSharedSecret()
 
 var wantsArchive = make([]byte, 1)
 
-var dataLen int = 64
-var numThreads int = 12
-var dbWriteSize int = 4
-
 var leaderPublicKey *[32]byte
 var followerPublicKey *[32]byte
 var clientPrivateKey *[32]byte
 var clientPublicKey *[32]byte
 
 func main() {
-
-	//creates test tweets
-	tweets := make([][]byte, numClients)
-	for i := range tweets {
-		tweets[i] = make([]byte, dataLen)
-	}
-
-	for i := 0; i < numClients; i++ {
-		var tweet []byte
-		if i == 0 {
-			topics := []byte("house, mouse;")
-			text := []byte("I am a house in a mouse;")
-			tweet = append(tweet, topics...)
-			tweet = append(tweet, text...)
-		}
-		tweet = append(tweet, []byte(";")[0])
-
-		//adds padding
-		length := dataLen - len(tweet)
-		padding := make([]byte, length)
-		rand.Read(padding)
-		tweet = append(tweet, padding...)
-
-		tweets[i] = tweet
-	}
-
 	wg := &sync.WaitGroup{}
 
 	for i := 0; i < numClients; i++ {
 		wg.Add(1)
-		go client(tweets[i], i)
+		go client(i)
 	}
 	wg.Wait()
 }
 
-func client(tweet []byte, clientNumber int) {
-
-	/*
-		if len(os.Args) != 4 {
-			fmt.Println("try again with: numThreads, dataLength, numRows")
-			return
-		}
-
-		//input when executing is follower amount
-		serverAmount, _ = strconv.Atoi(os.Args[1])
-		serverAmount++
-		dataLen, _ = strconv.Atoi(os.Args[2])
-		numThreads, _ = strconv.Atoi(os.Args[3])
-		dbSize, _ = strconv.Atoi(os.Args[4])
-	*/
+func client(clientNumber int) {
 
 	generatedPublicKey, generatedPrivateKey, err := box.GenerateKey(rand.Reader)
 	if err != nil {
@@ -181,6 +142,8 @@ func client(tweet []byte, clientNumber int) {
 			sendQuerys(encryptedQueryLeader, encryptedQueryFollower, leaderConn, false)
 			pos := receiveVirtualAddress(sharedSecret[clientNumber], leaderConn)
 
+			tweet := getTweet(clientNumber)
+
 			//prep the query
 			dataSize := len(tweet)
 			querySize := make([]byte, 4)
@@ -294,11 +257,17 @@ func createPIRQuery(subPhase int, clientNumber int) ([]byte, []byte) {
 	copy(tmptopicsOfInterest, topicsOfInterest)
 
 	tmpNeededSubscriptions := neededSubscriptions
+	if tmpNeededSubscriptions > len(topicList) {
+		tmpNeededSubscriptions = len(topicList)
+	}
 
 	tmpTopicList := make([]string, len(topicList))
 	copy(tmpTopicList, topicList)
 	if wantsArchive[0] == 1 && subPhase == -1 {
 		tmpNeededSubscriptions = len(archiveInterests)
+		if tmpNeededSubscriptions > len(archiveTopicList) {
+			tmpNeededSubscriptions = len(archiveTopicList)
+		}
 		copy(tmptopicsOfInterest, archiveInterests) //archiveInterests from gui
 		copy(tmpTopicList, archiveTopicList)
 	}
@@ -310,12 +279,11 @@ func createPIRQuery(subPhase int, clientNumber int) ([]byte, []byte) {
 
 	//creates fake topicsOfInterest if client is boooring
 	if len(tmptopicsOfInterest) < tmpNeededSubscriptions && subPhase != -1 {
-		tmptopicsOfInterest = addFakeInterests(neededSubscriptions, len(tmpTopicList), tmptopicsOfInterest, false)
+		tmptopicsOfInterest = addFakeInterests(len(tmpTopicList), tmptopicsOfInterest, false)
 	}
 
 	for topic, position := range tmptopicsOfInterest {
-		//change used to be positon-1
-		topicsOfInterestAsBytes[topic][position-1] = 1
+		topicsOfInterestAsBytes[topic][position] = 1
 	}
 
 	//pirQuery [serverAmount][topicsofinterest][topicAmount]byte
@@ -414,8 +382,14 @@ func receiveVirtualAddress(sharedSecret [2][32]byte, leaderConn net.Conn) int {
 func receiveTweets(sharedSecret [2][32]byte, leaderConn net.Conn, getArchive bool) int {
 
 	tmpNeededSubscriptions := neededSubscriptions
+	if tmpNeededSubscriptions > len(topicList) {
+		tmpNeededSubscriptions = len(topicList)
+	}
 	if getArchive {
 		tmpNeededSubscriptions = len(archiveInterests)
+		if tmpNeededSubscriptions > len(archiveTopicList) {
+			tmpNeededSubscriptions = len(archiveTopicList)
+		}
 	}
 	for i := 0; i < tmpNeededSubscriptions; i++ {
 		//client receives tweets
@@ -441,10 +415,8 @@ func receiveTweets(sharedSecret [2][32]byte, leaderConn net.Conn, getArchive boo
 
 		//tweets can be displayed
 		split := strings.Split(string(tweets), ";")
-		topic := split[0]
-		text := split[1]
-		fmt.Println("Topics: ", topic)
-		fmt.Println("Text: ", text)
+		text := split[:len(split)-1]
+		fmt.Println(text)
 	}
 	return 0
 }
@@ -512,12 +484,17 @@ func createAuditPIRQuery(clientNumber int) ([]byte, []byte) {
 }
 
 //generates a topicOfInterest array with random values
-func addFakeInterests(length, max int, topicsOfInterest []int, doAuditing bool) []int {
-	fakeTopicsOfInterest := make([]int, length)
+func addFakeInterests(max int, topicsOfInterest []int, doAuditing bool) []int {
+	tmpNeededSubscriptions := neededSubscriptions
+	if tmpNeededSubscriptions > len(topicList) {
+		tmpNeededSubscriptions = len(topicList)
+	}
+
+	fakeTopicsOfInterest := make([]int, tmpNeededSubscriptions)
 	maxInt := max
 
 	//fills the array with unique random ascending values ranging from 0 to max
-	for i := 0; i < length; i++ {
+	for i := 0; i < tmpNeededSubscriptions; i++ {
 		fakeTopicsOfInterest[i] = mr.Intn(maxInt)
 
 		for j := 0; j < i; j++ {
@@ -538,7 +515,7 @@ func addFakeInterests(length, max int, topicsOfInterest []int, doAuditing bool)
 		if !inList(number, topicsOfInterest) {
 			topicsOfInterest = append(topicsOfInterest, number)
 		}
-		if len(topicsOfInterest) == neededSubscriptions {
+		if len(topicsOfInterest) == tmpNeededSubscriptions {
 			break
 		}
 	}
@@ -574,6 +551,26 @@ func receiveTopicLists(leaderConn net.Conn) {
 	}
 }
 
+func getTweet(clientNumber int) []byte {
+	var tweet []byte
+
+	r := mr.New(mr.NewSource(time.Now().UnixNano()))
+
+	topics := []byte("house, mouse;")
+	text := []byte("I am a house in a mouse " + strconv.Itoa(r.Intn(100)) + ";")
+	tweet = append(tweet, topics...)
+	tweet = append(tweet, text...)
+	tweet = append(tweet, []byte(";")[0])
+
+	//adds padding
+	length := dataLength - len(tweet)
+	padding := make([]byte, length)
+	rand.Read(padding)
+	tweet = append(tweet, padding...)
+
+	return tweet
+}
+
 //sends the array to the connection
 func writeToConn(connection net.Conn, array []byte) {
 	_, err := connection.Write(array)

+ 20 - 26
follower/follower.go

@@ -8,7 +8,7 @@ package main
 //#include "../c/okv.c"
 import "C"
 
-//ssssssssssssss
+//sssssssssssssssss
 
 import (
 	"2PPS/lib"
@@ -30,11 +30,6 @@ import (
 	"golang.org/x/crypto/nacl/box"
 )
 
-const leader string = "127.0.0.1:4442"
-
-//needs to be changed at leader/follower/client at the same time
-const neededSubscriptions = 1
-
 //this stores all neccessary information for each client
 type clientKeys struct {
 	SharedSecret [32]byte
@@ -50,17 +45,17 @@ var followerPrivateKey *[32]byte
 var followerPublicKey *[32]byte
 var leaderPublicKey *[32]byte
 
-var numThreads int = 12
+//needs to be changed at leader/follower/client at the same time
+const neededSubscriptions = 1
+const dataLength = 64
+const numThreads = 12
 
-//has to be dividable by 32
-var dataLength int = 64
 var dbWriteSize int = 4
+var maxTimePerRound time.Duration = 5 * time.Second
 
 var round int = 1
 var startTime time.Time
 
-var maxTimePerRound time.Duration = 5 * time.Second
-
 func main() {
 
 	generatedPublicKey, generatedPrivateKey, err := box.GenerateKey(rand.Reader)
@@ -71,16 +66,6 @@ func main() {
 	followerPrivateKey = generatedPrivateKey
 	followerPublicKey = generatedPublicKey
 
-	/*
-		if len(os.Args) != 4 {
-			fmt.Println("try again: numThreads, dataLength, numRows")
-			return
-		}
-		numThreads, _ = strconv.Atoi(os.Args[2])
-		dataLength, _ = strconv.Atoi(os.Args[3])
-		numRows, _ = strconv.Atoi(os.Args[4])
-	*/
-
 	C.initializeServer(C.int(numThreads))
 
 	followerConnectionPrivateKey, err := rsa.GenerateKey(rand.Reader, 2048)
@@ -127,7 +112,7 @@ func main() {
 	fmt.Println("start leader")
 
 	//listens for leader
-	lnLeader, err := tls.Listen("tcp", ":4443", config)
+	lnLeader, err := tls.Listen("tcp", ":4442", config)
 	if err != nil {
 		panic(err)
 	}
@@ -153,7 +138,7 @@ func main() {
 	//setup ends here
 
 	//locks access to DB
-	var m sync.Mutex
+	m := &sync.RWMutex{}
 	wg := &sync.WaitGroup{}
 
 	for {
@@ -217,7 +202,7 @@ func main() {
 			}
 			leaderConnection.SetDeadline(time.Time{})
 			startTime = time.Now()
-			go phase3(leaderConnection, wg)
+			go phase3(leaderConnection, wg, m)
 		}
 
 		wg.Wait()
@@ -229,7 +214,7 @@ func main() {
 	}
 }
 
-func phase1(id int, leaderWorkerConnection net.Conn, m sync.Mutex, wg *sync.WaitGroup, virtualAddresses []int) {
+func phase1(id int, leaderWorkerConnection net.Conn, m *sync.RWMutex, wg *sync.WaitGroup, virtualAddresses []int) {
 
 	gotClient := make([]byte, 1)
 
@@ -258,11 +243,16 @@ func phase1(id int, leaderWorkerConnection net.Conn, m sync.Mutex, wg *sync.Wait
 		}
 		clientPublicKey = &tmpClientPublicKey
 
+		m.RLock()
 		clientKeys := clientData[tmpClientPublicKey]
+		m.RUnlock()
+
 		clientKeys, pirQuery := handlePirQuery(clientKeys, leaderWorkerConnection, 0, tmpClientPublicKey, true)
 		getSendVirtualAddress(pirQuery[0], virtualAddresses, clientKeys.SharedSecret, leaderWorkerConnection)
 
+		m.Lock()
 		clientData[*clientPublicKey] = clientKeys
+		m.Unlock()
 
 		//gets dpfQuery from leader
 		dpfLengthBytes := readFromConn(leaderWorkerConnection, 4)
@@ -466,7 +456,7 @@ func addTestTweets() {
 	lib.NewEntries(tweets, 0)
 }
 
-func phase3(leaderWorkerConnection net.Conn, wg *sync.WaitGroup) {
+func phase3(leaderWorkerConnection net.Conn, wg *sync.WaitGroup, m *sync.RWMutex) {
 
 	gotClient := make([]byte, 1)
 
@@ -488,7 +478,9 @@ func phase3(leaderWorkerConnection net.Conn, wg *sync.WaitGroup) {
 		}
 
 		//gets the client data
+		m.RLock()
 		clientKeys := clientData[clientPublicKey]
+		m.RUnlock()
 
 		if subPhase[0] == 0 || subPhase[0] == 1 {
 			clientKeys, _ = handlePirQuery(clientKeys, leaderWorkerConnection, int(subPhase[0]), clientPublicKey, false)
@@ -505,7 +497,9 @@ func phase3(leaderWorkerConnection net.Conn, wg *sync.WaitGroup) {
 		}
 
 		//saves clientKeys
+		m.Lock()
 		clientData[clientPublicKey] = clientKeys
+		m.Unlock()
 	}
 }
 

+ 31 - 27
leader/leader.go

@@ -8,7 +8,7 @@ package main
 //#include "../c/okv.c"
 import "C"
 
-//ssssssssssssss
+//ssssssssssssssss
 
 import (
 	"crypto/rand"
@@ -44,43 +44,32 @@ type clientKeys struct {
 
 var clientData = make(map[net.Addr]clientKeys)
 
-var leaderPrivateKey *[32]byte
+const follower string = "127.0.0.1:4442"
 
+var leaderPrivateKey *[32]byte
 var leaderPublicKey *[32]byte
-
 var followerPublicKey *[32]byte
 
-// every roundsBeforeUpdate the client updates his pirQuery, the sharedSecrets are updated locally
-const roundsBeforeUpdate int = 1
-
-const follower string = "127.0.0.1:4443"
-
-const maxNumberOfClients = 1000
-
-//needs to be changed at leader/follower/client at the same time
-const neededSubscriptions = 1
+const maxNumberOfClients = 10000000
 
 var topicList []byte
 var topicAmount int
 var archiveTopicAmount int
 
-//works on my machine
-var numThreads = 12
-
-//has to be dividable by 32
-var dataLength int = 64
+// every roundsBeforeUpdate the client updates his pirQuery
+const roundsBeforeUpdate = 5
+const neededSubscriptions = 1
+const numThreads = 12
+const dataLength = 64
 
-//this needs to be adjusted peridodically
 var dbWriteSize int = 4
+var maxTimePerRound time.Duration = 5 * time.Second
 
 //counts the number of rounds
 var round int = 1
 
 var startTime time.Time
 
-//adjust this for follower aswell
-var maxTimePerRound time.Duration = 5 * time.Second
-
 //channel for goroutine communication with clients
 var phase1Channel = make(chan net.Conn, maxNumberOfClients)
 var phase3Channel = make(chan net.Conn, maxNumberOfClients)
@@ -94,6 +83,9 @@ var publisherHistory [publisherRounds]int
 //todo! handle client dc during phase1/3
 func main() {
 
+	//prevents race conditions for wrtiting
+	m := &sync.RWMutex{}
+
 	generatedPublicKey, generatedPrivateKey, err := box.GenerateKey(rand.Reader)
 	if err != nil {
 		panic(err)
@@ -230,7 +222,9 @@ func main() {
 			var emptyArray [32]byte
 			var emptyByteArray [][]byte
 			keys := clientKeys{0, clientPublicKey, emptyArray, emptyByteArray}
+			m.Lock()
 			clientData[remoteAddress] = keys
+			m.Unlock()
 
 			phase1Channel <- clientConnection
 		}
@@ -241,9 +235,6 @@ func main() {
 	//the current phase
 	phase := make([]byte, 1)
 
-	//locks access to DB
-	var m sync.Mutex
-
 	for {
 
 		//phase1
@@ -311,7 +302,7 @@ func main() {
 			}
 			followerConnection.SetDeadline(time.Time{})
 
-			go phase3(id, phase, followerConnection, wg, startTime)
+			go phase3(id, phase, followerConnection, wg, startTime, m)
 		}
 
 		wg.Wait()
@@ -322,7 +313,7 @@ func main() {
 	}
 }
 
-func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGroup, m sync.Mutex, startTime time.Time, virtualAddresses []int) {
+func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGroup, m *sync.RWMutex, startTime time.Time, virtualAddresses []int) {
 
 	roundAsBytes := intToByte(round)
 	gotClient := make([]byte, 1)
@@ -346,7 +337,9 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		writeToConn(followerConnection, gotClient)
 
 		//sends clients publicKey to follower
+		m.RLock()
 		clientPublicKey := clientData[clientConnection.RemoteAddr()].PublicKey
+		m.RUnlock()
 		writeToConn(followerConnection, clientPublicKey[:])
 
 		//setup the worker-specific db
@@ -365,11 +358,15 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		//tells client current round
 		writeToConn(clientConnection, roundAsBytes)
 
+		m.RLock()
 		var clientKeys = clientData[clientConnection.RemoteAddr()]
+		m.RUnlock()
 		clientKeys, pirQuery := handlePirQuery(clientKeys, clientConnection, followerConnection, 0, true)
 		getSendVirtualAddress(pirQuery[0], virtualAddresses, clientKeys.SharedSecret, clientConnection, followerConnection)
 
+		m.Lock()
 		clientData[clientConnection.RemoteAddr()] = clientKeys
+		m.Unlock()
 
 		//accept dpfQuery from client
 		dpfLengthBytes := make([]byte, 4)
@@ -630,7 +627,7 @@ func addTestTweets() {
 }
 
 //opti! mb it is quicker to send updated topicLists to clients first so pirQuerys are ready
-func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGroup, startTime time.Time) {
+func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGroup, startTime time.Time, m *sync.RWMutex) {
 
 	gotClient := make([]byte, 1)
 	gotClient[0] = 0
@@ -667,8 +664,11 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		subPhase := make([]byte, 1)
 
 		//gets the data for the current client
+		m.RLock()
 		var clientKeys = clientData[clientConnection.RemoteAddr()]
+		m.RUnlock()
 		var roundsParticipating = clientKeys.roundsParticipating
+
 		//client participates for the first time
 		if roundsParticipating == 0 {
 			subPhase[0] = 0
@@ -691,7 +691,9 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		clientKeys.roundsParticipating = roundsParticipating + 1
 
 		//declaring variables here to prevent dupclicates later
+		m.RLock()
 		var sharedSecret [32]byte = clientData[clientConnection.RemoteAddr()].SharedSecret
+		m.RUnlock()
 
 		if subPhase[0] == 0 {
 			sendTopicLists(clientConnection)
@@ -719,7 +721,9 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		}
 
 		//saves all changes for client
+		m.Lock()
 		clientData[clientConnection.RemoteAddr()] = clientKeys
+		m.Unlock()
 
 		phase1Channel <- clientConnection
 

+ 20 - 1
lib/databaseRead.go

@@ -23,7 +23,7 @@ var archive = make(map[string][]Tweet)
 //has to be dividable by 32
 var minimumBlockSize int
 
-const roundsBeforeArchiving = 3
+const roundsBeforeArchiving = 300
 
 var topicList []string
 
@@ -183,4 +183,23 @@ func CleanUpdbR(round int) {
 		dbR[j] = tweets
 	}
 	NewEntries(tweetsToArchive, 1)
+
+	//redoes the whole dbR to correct pointers
+	var tweetsToMain []Tweet
+
+	for i := range dbR {
+		tweets := dbR[i]
+		for _, tweet := range tweets {
+			if tweet.Text != "" {
+				tweetsToMain = append(tweetsToMain, tweet)
+			}
+		}
+	}
+
+	dbR = nil
+	dbR = make(map[string][]Tweet)
+	topicList = nil
+
+	NewEntries(tweetsToMain, 0)
+
 }