Browse Source

implemented adaptive dbWriteSize

Simon 2 years ago
parent
commit
20a37454a2
3 changed files with 50 additions and 24 deletions
  1. 10 3
      client/client.go
  2. 10 6
      follower/follower.go
  3. 30 15
      leader/leader.go

+ 10 - 3
client/client.go

@@ -50,7 +50,7 @@ var wantsArchive = make([]byte, 1)
 
 var dataLen int = 32
 var numThreads int = 12
-var numRows int = 2
+var dbWriteSize int = 2
 
 var leaderPublicKey *[32]byte
 var followerPublicKey *[32]byte
@@ -165,6 +165,14 @@ func client(tweet []byte, clientNumber int) {
 
 		if phase[0] == 1 {
 
+			//gets current dbWriteSize from leader
+			dbWriteSizeBytes := make([]byte, 4)
+			_, err = leaderConn.Read(dbWriteSizeBytes)
+			if err != nil {
+				panic(err)
+			}
+			dbWriteSize = byteToInt(dbWriteSizeBytes)
+
 			//todo! put into tweet creation
 			roundAsBytes := make([]byte, 4)
 			_, err = leaderConn.Read(roundAsBytes)
@@ -180,8 +188,7 @@ func client(tweet []byte, clientNumber int) {
 			var dpfQueryA *C.uchar
 			var dpfQueryB *C.uchar
 
-			//change to clientNumber
-			C.prepQuery(C.int(0), C.int(numRows), (*C.uchar)(&tweet[0]), C.int(dataSize), &cQuerySize, &dpfQueryA, &dpfQueryB)
+			C.prepQuery(C.int(clientNumber), C.int(dbWriteSize), (*C.uchar)(&tweet[0]), C.int(dataSize), &cQuerySize, &dpfQueryA, &dpfQueryB)
 
 			intQuerySize := int(cQuerySize) //byteToInt(querySize)
 

+ 10 - 6
follower/follower.go

@@ -54,7 +54,7 @@ var numThreads int = 12
 
 //has to be dividable by 32
 var dataLength int = 32
-var numRows int = 2
+var dbWriteSize int = 2
 
 var round int = 1
 var startTime time.Time
@@ -165,7 +165,7 @@ func main() {
 		fmt.Println("phase1")
 
 		//create write db for this round
-		for i := 0; i < numRows; i++ {
+		for i := 0; i < dbWriteSize; i++ {
 			C.createDb(C.int(0), C.int(dataLength))
 		}
 
@@ -319,8 +319,6 @@ func phase1(id int, leaderWorkerConnection net.Conn, m sync.Mutex, wg *sync.Wait
 			pos := C.getUint128_t(C.int(i))
 			//fmt.Println(i, pos)
 			v := C.evalDPF(C.ctx[id], (*C.uchar)(&dpfQueryB[0]), pos, C.int(ds), (*C.uchar)(&dataShare[0]))
-			fmt.Println("")
-			fmt.Println("v", i, v)
 			copy(vector[i*16:(i+1)*16], C.GoBytes(unsafe.Pointer(&v), 16))
 			for j := 0; j < ds; j++ {
 				db[i][j] = db[i][j] ^ dataShare[j]
@@ -533,6 +531,14 @@ func phase2(leaderWorkerConnection net.Conn) {
 
 	//reset write db after the tweets were moved to read db
 	C.resetDb()
+
+	//gets current dbWriteSize from leader
+	dbWriteSizeBytes := make([]byte, 4)
+	_, err = leaderWorkerConnection.Read(dbWriteSizeBytes)
+	if err != nil {
+		panic(err)
+	}
+	dbWriteSize = byteToInt(dbWriteSizeBytes)
 }
 
 func addTestTweets() {
@@ -628,8 +634,6 @@ func getSendTweets(clientKeys clientKeys, archiveQuerys [][]byte, leaderWorkerCo
 			expandedSharedSecret = append(expandedSharedSecret, clientKeys.SharedSecret[:]...)
 		}
 
-		fmt.Println(expandedSharedSecret)
-
 		//Xor's sharedSecret with all tweets
 		lib.Xor(expandedSharedSecret[:], tweets)
 

+ 30 - 15
leader/leader.go

@@ -19,6 +19,7 @@ import (
 	"crypto/x509/pkix"
 	"encoding/pem"
 	"fmt"
+	"math"
 	"math/big"
 	"net"
 	"strconv"
@@ -68,7 +69,7 @@ var numThreads = 12
 var dataLength int = 32
 
 //this needs to be adjusted peridodically
-var numRows int = 2
+var dbWriteSize int = 2
 
 //counts the number of rounds
 var round int = 1
@@ -82,6 +83,9 @@ var maxTimePerRound time.Duration = 5 * time.Second
 var phase1Channel = make(chan net.Conn, maxNumberOfClients)
 var phase3Channel = make(chan net.Conn, maxNumberOfClients)
 
+//variable for calculating the dbWrite size
+var publisherNumber float64
+
 func main() {
 
 	generatedPublicKey, generatedPrivateKey, err := box.GenerateKey(rand.Reader)
@@ -240,7 +244,7 @@ func main() {
 		fmt.Println("phase1")
 
 		//creates a new write Db for this round
-		for i := 0; i < numRows; i++ {
+		for i := 0; i < dbWriteSize; i++ {
 			C.createDb(C.int(1), C.int(dataLength))
 		}
 		for id := 0; id < numThreads; id++ {
@@ -354,6 +358,14 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 			panic(err)
 		}
 
+		//tells client current dbWriteSize
+		_, err = clientConnection.Write(intToByte(dbWriteSize))
+		if err != nil {
+			panic(err)
+		}
+
+		fmt.Println("SizeToClient", dbWriteSize)
+
 		//tells client current round
 		_, err = clientConnection.Write(roundAsBytes)
 		if err != nil {
@@ -470,15 +482,6 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 			dataShare := make([]byte, ds)
 			pos := C.getUint128_t(C.int(i))
 			v := C.evalDPF(C.ctx[id], (*C.uchar)(&dpfQueryA[0]), pos, C.int(ds), (*C.uchar)(&dataShare[0]))
-			fmt.Println("")
-			fmt.Println("v", i, v)
-			/*if i == 99 {
-				for i := 0; i < 16; i++ {
-					str = str + strconv.FormatInt(int64(v[i]), 2)
-				}
-				fmt.Println(strconv.ParseInt(str, 2, len(str)))
-			}*/
-
 			copy(vector[i*16:(i+1)*16], C.GoBytes(unsafe.Pointer(&v), 16))
 			for j := 0; j < ds; j++ {
 				db[i][j] = db[i][j] ^ dataShare[j]
@@ -671,16 +674,16 @@ func phase2(followerConnection net.Conn) {
 	//decrypt each row
 	for i := 0; i < dbSize; i++ {
 		C.decryptRow(C.int(i), (*C.uchar)(&tmpdb[i][0]), (*C.uchar)(&tmpdbLeader[i][0]), (*C.uchar)(&tmpdbFollower[i][0]), (*C.uchar)(&seedLeader[0]), (*C.uchar)(&seedFollower[0]))
-		fmt.Println(i, tmpdb[i])
-		fmt.Println("")
 	}
 
 	var tweets []lib.Tweet
+	publisherNumber = 0
 	for i := 0; i < dbSize; i++ {
 		//discard cover message
 		if tmpdb[i][0] == 0 {
 			continue
 		} else {
+			publisherNumber++
 			//reconstruct tweet
 			var position int = 0
 			var topics []string
@@ -719,6 +722,20 @@ func phase2(followerConnection net.Conn) {
 	lib.NewEntries(tweets, 0)
 
 	C.resetDb()
+
+	//calculates the dbWriteSize for this round
+	if publisherNumber > 0 {
+		dbWriteSize = int(math.Ceil(19.5 * publisherNumber))
+	} else {
+		//default, handles 100 clients
+		dbWriteSize = 1950
+	}
+
+	//writes dbWriteSize of current round to follower
+	_, err = followerConnection.Write(intToByte(dbWriteSize))
+	if err != nil {
+		panic(err)
+	}
 }
 
 func addTestTweets() {
@@ -900,8 +917,6 @@ func getSendTweets(clientKeys clientKeys, archiveQuerys [][]byte, clientConnecti
 			expandedSharedSecret = append(expandedSharedSecret, clientKeys.SharedSecret[:]...)
 		}
 
-		fmt.Println(expandedSharedSecret)
-
 		//Xor's sharedSecret with all tweets
 		lib.Xor(expandedSharedSecret[:], tweets)