Browse Source

started work on new auditing

Simon 2 years ago
parent
commit
35fd7141eb
4 changed files with 191 additions and 56 deletions
  1. 82 36
      client/client.go
  2. 16 5
      follower/follower.go
  3. 92 15
      leader/leader.go
  4. 1 0
      lib/databaseRead.go

+ 82 - 36
client/client.go

@@ -75,9 +75,10 @@ func main() {
 			tweet = append(tweet, topics...)
 			tweet = append(tweet, text...)
 		}
-
+		tweet = append(tweet, []byte(";")[0])
 		length := len(tweet)
 		for i := length; i < dataLen; i++ {
+			//todo! replace with random chars
 			tweet = append(tweet, []byte(";")[0])
 		}
 		tweets[i] = tweet
@@ -182,6 +183,12 @@ func client(tweet []byte, clientNumber int) {
 				panic(err)
 			}
 
+			//request virtualAddress from leader via pirQuery
+			encryptedQueryLeader, encryptedQueryFollower := createAuditPIRQuery(clientNumber)
+			sendQuerys(encryptedQueryLeader, encryptedQueryFollower, leaderConn, false)
+			pos := receiveTweets(sharedSecret[clientNumber], leaderConn, clientNumber, false, true)
+			fmt.Println(pos)
+
 			//prep the query
 			dataSize := len(tweet)
 			querySize := make([]byte, 4)
@@ -191,7 +198,7 @@ func client(tweet []byte, clientNumber int) {
 			var dpfQueryB *C.uchar
 
 			//change
-			C.prepQuery(C.int(1), C.int(dbWriteSize), (*C.uchar)(&tweet[0]), C.int(dataSize), &cQuerySize, &dpfQueryA, &dpfQueryB)
+			C.prepQuery(C.int(pos), C.int(dbWriteSize), (*C.uchar)(&tweet[0]), C.int(dataSize), &cQuerySize, &dpfQueryA, &dpfQueryB)
 
 			intQuerySize := int(cQuerySize) //byteToInt(querySize)
 
@@ -275,7 +282,7 @@ func client(tweet []byte, clientNumber int) {
 
 			}
 
-			receiveTweets(sharedSecret[clientNumber], leaderConn, clientNumber, false)
+			receiveTweets(sharedSecret[clientNumber], leaderConn, clientNumber, false, false)
 
 			if len(archiveTopicList) > 0 {
 				wantsArchive[0] = 0 //archive test
@@ -291,7 +298,7 @@ func client(tweet []byte, clientNumber int) {
 			if wantsArchive[0] == 1 && len(archiveTopicList) > 0 {
 				encryptedQueryLeader, encryptedQueryFollower = createPIRQuery(-1, clientNumber)
 				sendQuerys(encryptedQueryLeader, encryptedQueryFollower, leaderConn, true)
-				receiveTweets(sharedSecret[clientNumber], leaderConn, clientNumber, true)
+				receiveTweets(sharedSecret[clientNumber], leaderConn, clientNumber, true, false)
 			}
 
 		} else {
@@ -328,31 +335,14 @@ func createPIRQuery(subPhase int, clientNumber int) ([]byte, []byte) {
 
 	//creates fake topicsOfInterest if client is boooring
 	if len(tmptopicsOfInterest) < tmpNeededSubscriptions && subPhase != -1 {
-		tmptopicsOfInterest = addFakeInterests(tmptopicsOfInterest)
+		tmptopicsOfInterest = addFakeInterests(neededSubscriptions, len(tmpTopicList), tmptopicsOfInterest, false)
 	}
 
 	for topic, position := range tmptopicsOfInterest {
-		topicsOfInterestAsBytes[topic][position-1] = 1
+		//change used to be positon-1
+		topicsOfInterestAsBytes[topic][position] = 1
 	}
 
-	//this for N servers for one topic
-	/* interested in topic 3, 6 servers
-	this needs to be repeated for every topic client is interested in
-
-	wanted
-	[0, 0, 1]
-
-	random creation of 1 pirQuery
-	[0, 1, 0]
-
-	manual creation of 1 pirQuery
-	[0, 1, 1]
-
-	xor result
-	[0, 0, 1]
-
-	*/
-
 	//pirQuery [serverAmount][topicsofinterest][topicAmount]byte
 	pirQuerys := make([][][]byte, 2)
 	for i := range pirQuerys {
@@ -445,12 +435,15 @@ func sendQuerys(encryptedQueryLeader, encryptedQueryFollower []byte, leaderConn
 	}
 }
 
-func receiveTweets(sharedSecret [2][32]byte, leaderConn net.Conn, clientNumber int, getArchive bool) {
+func receiveTweets(sharedSecret [2][32]byte, leaderConn net.Conn, clientNumber int, getArchive, doAuditing bool) int {
 
 	tmpNeededSubscriptions := neededSubscriptions
 	if getArchive {
 		tmpNeededSubscriptions = len(archiveInterests)
 	}
+	if doAuditing {
+		tmpNeededSubscriptions = 1
+	}
 	for i := 0; i < tmpNeededSubscriptions; i++ {
 		//client receives tweets
 		tweetsLengthBytes := make([]byte, 4)
@@ -475,7 +468,6 @@ func receiveTweets(sharedSecret [2][32]byte, leaderConn net.Conn, clientNumber i
 			for j := 0; j < expandBy; j++ {
 				expandedSharedSecrets[i] = append(expandedSharedSecrets[i], sharedSecret[i][:]...)
 			}
-			//fmt.Println(expandedSharedSecrets[i])
 		}
 
 		//xors the received messge into the message to display
@@ -485,7 +477,12 @@ func receiveTweets(sharedSecret [2][32]byte, leaderConn net.Conn, clientNumber i
 
 		//tweets can be displayed
 		fmt.Println("final result: ", string(tweets))
+
+		if doAuditing {
+			return byteToInt(tweets)
+		}
 	}
+	return 0
 }
 
 //creates a shared secret for each server
@@ -503,15 +500,61 @@ func createSharedSecret() [numClients][2][32]byte {
 	return tmpSharedSecret
 }
 
+func createAuditPIRQuery(clientNumber int) ([]byte, []byte) {
+	//pirQuery [serverAmount][dbWriteSize]byte
+	pirQuerys := make([][]byte, 2)
+	for i := range pirQuerys {
+		pirQuerys[i] = make([]byte, dbWriteSize)
+	}
+
+	//for leader
+	//pirQuery will be filled with random bits
+	for index := range pirQuerys[0] {
+		bit := mr.Intn(2)
+		pirQuerys[0][index] = byte(bit)
+	}
+	copy(pirQuerys[1], pirQuerys[0])
+
+	//the positon the virtual address will be taken from
+	pos := mr.Intn(dbWriteSize)
+	pirQuerys[0][pos] = 1
+	pirQuerys[1][pos] = 0
+
+	//flattens the querys to be able to send them more efficently
+	messagesFlattened := make([][]byte, 2)
+	//adds the sharedSecret to the pirQuery
+	for server := 0; server < 2; server++ {
+		messagesFlattened[server] = append(messagesFlattened[server], sharedSecret[clientNumber][server][:]...)
+	}
+
+	for server := 0; server < 2; server++ {
+		messagesFlattened[server] = append(messagesFlattened[server], pirQuerys[server][:]...)
+	}
+
+	var nonce [24]byte
+	_, err := rand.Read(nonce[:])
+	if err != nil {
+		panic("couldn't get randomness for nonce!")
+	}
+	encryptedQueryLeader := box.Seal(nonce[:], messagesFlattened[0], &nonce, leaderPublicKey, clientPrivateKey)
+
+	_, err = rand.Read(nonce[:])
+	if err != nil {
+		panic("couldn't get randomness for nonce!")
+	}
+	encryptedQueryFollower := box.Seal(nonce[:], messagesFlattened[1], &nonce, followerPublicKey, clientPrivateKey)
+
+	return encryptedQueryLeader, encryptedQueryFollower
+}
+
 //generates a topicOfInterest array with random values
-//todo! fix
-func addFakeInterests(topicsOfInterest []int) []int {
-	fakeTopicsOfInterest := make([]int, neededSubscriptions)
-	maxInt := neededSubscriptions
+func addFakeInterests(length, max int, topicsOfInterest []int, doAuditing bool) []int {
+	fakeTopicsOfInterest := make([]int, length)
+	maxInt := max
 
-	//fills the array with unique random ascending values in range with len(topicList)
-	for i := 0; i < neededSubscriptions; i++ {
-		fakeTopicsOfInterest[i] = mr.Intn(maxInt) + 1
+	//fills the array with unique random ascending values ranging from 0 to max
+	for i := 0; i < length; i++ {
+		fakeTopicsOfInterest[i] = mr.Intn(maxInt)
 
 		for j := 0; j < i; j++ {
 			if fakeTopicsOfInterest[i] == fakeTopicsOfInterest[j] {
@@ -520,8 +563,11 @@ func addFakeInterests(topicsOfInterest []int) []int {
 			}
 		}
 	}
-	sort.Ints(fakeTopicsOfInterest)
-	fmt.Println(fakeTopicsOfInterest)
+
+	if doAuditing {
+		sort.Ints(fakeTopicsOfInterest)
+		return fakeTopicsOfInterest
+	}
 
 	//adds unique and new random numbers to topicOfInterests until length is satisfied
 	for _, number := range fakeTopicsOfInterest {
@@ -533,7 +579,7 @@ func addFakeInterests(topicsOfInterest []int) []int {
 		}
 	}
 
-	fmt.Println(topicsOfInterest)
+	sort.Ints(topicsOfInterest)
 
 	return topicsOfInterest
 }

+ 16 - 5
follower/follower.go

@@ -169,6 +169,17 @@ func main() {
 			C.createDb(C.int(0), C.int(dataLength))
 		}
 
+		//receives the virtualAddresses
+		virtualAddresses := make([]int, dbWriteSize+1)
+		for i := 0; i <= dbWriteSize; i++ {
+			virtualAddress := make([]byte, 4)
+			_, err = leaderConnection.Read(virtualAddress)
+			if err != nil {
+				panic(err)
+			}
+			virtualAddresses[i] = byteToInt(virtualAddress)
+		}
+
 		for i := 0; i < numThreads; i++ {
 			wg.Add(1)
 			leaderConnection, err := lnLeader.Accept()
@@ -257,6 +268,9 @@ func phase1(id int, leaderWorkerConnection net.Conn, m sync.Mutex, wg *sync.Wait
 		}
 		clientPublicKey = &tmpClientPublicKey
 
+		//auditing starts here
+		//todo!
+
 		//gets dpfQuery from leader
 		dpfLengthBytes := make([]byte, 4)
 		_, err = leaderWorkerConnection.Read(dpfLengthBytes)
@@ -280,15 +294,12 @@ func phase1(id int, leaderWorkerConnection net.Conn, m sync.Mutex, wg *sync.Wait
 			panic("dpfQueryB decryption not ok")
 		}
 
-		vector := make([]byte, dbSize*16)
 		//run dpf, xor into local db
 		for i := 0; i < dbSize; i++ {
 			ds := int(C.db[i].dataSize)
 			dataShare := make([]byte, ds)
-			pos := C.getUint128_t(C.int(i))
-			//fmt.Println(i, pos)
-			v := C.evalDPF(C.ctx[id], (*C.uchar)(&dpfQueryB[0]), pos, C.int(ds), (*C.uchar)(&dataShare[0]))
-			copy(vector[i*16:(i+1)*16], C.GoBytes(unsafe.Pointer(&v), 16))
+			pos := C.getUint128_t(C.int(virtualAddresses[i]))
+			C.evalDPF(C.ctx[id], (*C.uchar)(&dpfQueryB[0]), pos, C.int(ds), (*C.uchar)(&dataShare[0]))
 			for j := 0; j < ds; j++ {
 				db[i][j] = db[i][j] ^ dataShare[j]
 			}

+ 92 - 15
leader/leader.go

@@ -21,7 +21,9 @@ import (
 	"fmt"
 	"math"
 	"math/big"
+	mr "math/rand"
 	"net"
+	"sort"
 	"strconv"
 	"sync"
 	"time"
@@ -69,7 +71,7 @@ var numThreads = 12
 var dataLength int = 32
 
 //this needs to be adjusted peridodically
-var dbWriteSize int = 2
+var dbWriteSize int = 20
 
 //counts the number of rounds
 var round int = 1
@@ -89,6 +91,7 @@ const publisherRounds int = 3
 var publisherAmount float64
 var publisherHistory [publisherRounds]int
 
+//todo! handle client dc during phase1/3
 func main() {
 
 	generatedPublicKey, generatedPrivateKey, err := box.GenerateKey(rand.Reader)
@@ -256,6 +259,18 @@ func main() {
 		for i := 0; i < dbWriteSize; i++ {
 			C.createDb(C.int(1), C.int(dataLength))
 		}
+
+		//creates a new db containing virtual addresses for auditing
+		virtualAddresses := createVirtualAddresses()
+
+		//send all virtualAddresses to follower
+		for i := 0; i <= dbWriteSize; i++ {
+			_, err = followerConnection.Write(intToByte(virtualAddresses[i]))
+			if err != nil {
+				panic(err)
+			}
+		}
+
 		for id := 0; id < numThreads; id++ {
 			wg.Add(1)
 
@@ -265,7 +280,7 @@ func main() {
 			}
 			followerConnection.SetDeadline(time.Time{})
 
-			go phase1(id, phase, followerConnection, wg, m, startTime)
+			go phase1(id, phase, followerConnection, wg, m, startTime, virtualAddresses)
 		}
 
 		wg.Wait()
@@ -314,7 +329,7 @@ func main() {
 	}
 }
 
-func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGroup, m sync.Mutex, startTime time.Time) {
+func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGroup, m sync.Mutex, startTime time.Time, virtualAddresses []int) {
 
 	roundAsBytes := intToByte(round)
 	gotClient := make([]byte, 1)
@@ -375,6 +390,13 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 			panic(err)
 		}
 
+		//auditing starts here
+		var clientKeys = clientData[clientConnection.RemoteAddr()]
+		clientKeys, pirQuery := handlePirQuery(clientKeys, clientConnection, followerConnection, 0, true)
+		getSendVirtualAddress(pirQuery[0], virtualAddresses, clientKeys.SharedSecret, clientConnection, followerConnection)
+
+		clientData[clientConnection.RemoteAddr()] = clientKeys
+
 		//accept dpfQuery from client
 		dpfLengthBytes := make([]byte, 4)
 		_, err = clientConnection.Read(dpfLengthBytes)
@@ -414,15 +436,15 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 			panic("dpfQueryA decryption not ok")
 		}
 
-		vector := make([]byte, dbSize*16)
-		//var str string
+		//todo!
+		//eval for pos that is not in db then exchange with follower to drop or allow
+
 		//run dpf, xor into local db
 		for i := 0; i < dbSize; i++ {
 			ds := int(C.db[i].dataSize)
 			dataShare := make([]byte, ds)
-			pos := C.getUint128_t(C.int(i))
-			v := C.evalDPF(C.ctx[id], (*C.uchar)(&dpfQueryA[0]), pos, C.int(ds), (*C.uchar)(&dataShare[0]))
-			copy(vector[i*16:(i+1)*16], C.GoBytes(unsafe.Pointer(&v), 16))
+			pos := C.getUint128_t(C.int(virtualAddresses[i]))
+			C.evalDPF(C.ctx[id], (*C.uchar)(&dpfQueryA[0]), pos, C.int(ds), (*C.uchar)(&dataShare[0]))
 			for j := 0; j < ds; j++ {
 				db[i][j] = db[i][j] ^ dataShare[j]
 			}
@@ -726,7 +748,7 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 
 		if subPhase[0] == 0 {
 			sendTopicLists(clientConnection)
-			clientKeys, _ = handlePirQuery(clientKeys, clientConnection, followerConnection, int(subPhase[0]))
+			clientKeys, _ = handlePirQuery(clientKeys, clientConnection, followerConnection, int(subPhase[0]), false)
 		} else if subPhase[0] == 1 {
 			sendTopicLists(clientConnection)
 
@@ -734,7 +756,7 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 			sharedSecret = sha256.Sum256(sharedSecret[:])
 			clientKeys.SharedSecret = sharedSecret
 
-			clientKeys, _ = handlePirQuery(clientKeys, clientConnection, followerConnection, int(subPhase[0]))
+			clientKeys, _ = handlePirQuery(clientKeys, clientConnection, followerConnection, int(subPhase[0]), false)
 		}
 
 		getSendTweets(clientKeys, nil, clientConnection, followerConnection)
@@ -751,7 +773,7 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		}
 
 		if wantsArchive[0] == 1 && archiveTopicAmount > 0 {
-			_, archiveQuerys := handlePirQuery(clientKeys, clientConnection, followerConnection, -1)
+			_, archiveQuerys := handlePirQuery(clientKeys, clientConnection, followerConnection, -1, false)
 			getSendTweets(clientKeys, archiveQuerys, clientConnection, followerConnection)
 		}
 
@@ -784,6 +806,58 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 	}
 }
 
+func createVirtualAddresses() []int {
+	//array will be filled with unique random ascending values
+	//adapted from: https://stackoverflow.com/questions/20039025/java-array-of-unique-randomly-generated-integers
+	//+1 to have a position to evaluate each received message
+	arraySize := dbWriteSize + 1
+	var maxInt int = int(math.Pow(2, 31))
+	virtualAddresses := make([]int, arraySize)
+	for i := 0; i < arraySize; i++ {
+		virtualAddresses[i] = mr.Intn(maxInt)
+
+		for j := 0; j < i; j++ {
+			if virtualAddresses[i] == virtualAddresses[j] {
+				i--
+				break
+			}
+		}
+	}
+
+	sort.Ints(virtualAddresses)
+	return virtualAddresses
+}
+
+func getSendVirtualAddress(pirQuery []byte, virtualAddresses []int, sharedSecret [32]byte, clientConnection, followerConnection net.Conn) {
+	virtualAddress := make([]byte, 4)
+	for _, num := range pirQuery {
+		if num == 1 {
+			currentAddress := intToByte(virtualAddresses[num])
+			for i := 0; i < 4; i++ {
+				virtualAddress[i] = virtualAddress[i] ^ currentAddress[i]
+			}
+		}
+	}
+	for i := 0; i < 4; i++ {
+		virtualAddress[i] = virtualAddress[i] ^ sharedSecret[i]
+	}
+
+	followersAddress := make([]byte, 4)
+	_, err := followerConnection.Read(followersAddress)
+	if err != nil {
+		panic(err)
+	}
+
+	for i := 0; i < 4; i++ {
+		virtualAddress[i] = virtualAddress[i] ^ followersAddress[i]
+	}
+
+	_, err = clientConnection.Write(virtualAddress)
+	if err != nil {
+		panic(err)
+	}
+}
+
 func getSendTweets(clientKeys clientKeys, archiveQuerys [][]byte, clientConnection, followerConnection net.Conn) {
 	tmpNeededSubscriptions := neededSubscriptions
 	if archiveQuerys != nil {
@@ -838,7 +912,7 @@ func getSendTweets(clientKeys clientKeys, archiveQuerys [][]byte, clientConnecti
 	}
 }
 
-func handlePirQuery(clientKeys clientKeys, clientConnection net.Conn, followerConnection net.Conn, subPhase int) (clientKeys, [][]byte) {
+func handlePirQuery(clientKeys clientKeys, clientConnection net.Conn, followerConnection net.Conn, subPhase int, doAuditing bool) (clientKeys, [][]byte) {
 
 	clientPublicKey := clientKeys.PublicKey
 
@@ -880,6 +954,10 @@ func handlePirQuery(clientKeys clientKeys, clientConnection net.Conn, followerCo
 		tmpNeededSubscriptions = byteToInt(archiveNeededSubscriptions)
 		tmpTopicAmount = archiveTopicAmount
 	}
+	if doAuditing {
+		tmpNeededSubscriptions = 1
+		tmpTopicAmount = dbWriteSize
+	}
 
 	//send length to follower
 	_, err = followerConnection.Write(msgLengthBytes)
@@ -902,7 +980,6 @@ func handlePirQuery(clientKeys clientKeys, clientConnection net.Conn, followerCo
 
 	//if sharedSecret is send
 	if subPhase == 0 {
-		//bs!
 		var tmpSharedSecret [32]byte
 		for index := 0; index < 32; index++ {
 			tmpSharedSecret[index] = decrypted[index]
@@ -921,8 +998,8 @@ func handlePirQuery(clientKeys clientKeys, clientConnection net.Conn, followerCo
 		pirQuerys[i] = pirQueryFlattened[i*tmpTopicAmount : (i+1)*tmpTopicAmount]
 	}
 
-	//sets the pirQuery for the client in case whe are not archiving
-	if subPhase != -1 {
+	//sets the pirQuery for the client in case whe are not archiving, and not Auditing
+	if subPhase != -1 && !doAuditing {
 		clientKeys.PirQuery = pirQuerys
 	}
 

+ 1 - 0
lib/databaseRead.go

@@ -141,6 +141,7 @@ func tweetsToByteArray(tweetsToReturn [][]Tweet) []byte {
 		}
 		//adds padding
 		for i := len(tweetToAppend); i < minimumBlockSize; i++ {
+			//todo! replace with random chars
 			tweetToAppend = append(tweetToAppend, []byte(";")[0])
 		}
 		Xor(tweetToAppend, tweetsAsBytes)