Browse Source

added logs for eval

Simon 1 year ago
parent
commit
444ca090c7
4 changed files with 106 additions and 73 deletions
  1. 31 13
      client/client.go
  2. 1 1
      follower/follower.go
  3. 62 58
      leader/leader.go
  4. 12 1
      lib/databaseRead.go

+ 31 - 13
client/client.go

@@ -19,6 +19,7 @@ import (
 	"crypto/tls"
 	"encoding/json"
 	"fmt"
+	"log"
 	"math/big"
 	mr "math/rand"
 	"net"
@@ -58,8 +59,11 @@ var archiveTopicList []string
 var neededSubscriptions int
 var publisherAmount int
 var goodPadding int
-var timeBounds []int
-var speedUp int = 1000
+var blocksReceived int
+var timeBounds []float64
+
+//this translates to a simulated round length of ~2h
+var speedUp float64 = 7200 / ((maxTimePerRound.Seconds()) * 3)
 
 var maxTimePerRound time.Duration = 10 * time.Second
 var startTime int
@@ -75,19 +79,27 @@ var clientPrivateKey [numClients]*[32]byte
 var clientPublicKey [numClients]*[32]byte
 
 func main() {
+	f, err := os.OpenFile("evalDataClient", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
+	if err != nil {
+		log.Fatalf("error opening file: %v", err)
+	}
+	defer f.Close()
+
+	log.SetOutput(f)
+	log.Println("simulated Duration", speedUp)
 	wg := &sync.WaitGroup{}
 
 	getTimeBounds()
 
 	for i := 0; i < numClients; i++ {
 		wg.Add(1)
-		go client(i)
+		go client(i, f)
 		time.Sleep(1 * time.Millisecond)
 	}
 	wg.Wait()
 }
 
-func client(clientNumber int) {
+func client(clientNumber int, f *os.File) {
 
 	generatedPublicKey, generatedPrivateKey, err := box.GenerateKey(rand.Reader)
 	if err != nil {
@@ -170,9 +182,12 @@ func client(clientNumber int) {
 
 			tweet := getRealTweet(clientNumber)
 			if clientNumber == numClients-1 {
-				fmt.Println("publisherAmount", publisherAmount)
-				fmt.Println("goodPadding", goodPadding)
-
+				log.Println("Round", round)
+				log.Println("publisherAmount", publisherAmount)
+				log.Println("goodPadding", goodPadding)
+				log.Println("blocksReceived", blocksReceived)
+				log.Println("goodPadding Percentage", float64(goodPadding)/float64(blocksReceived))
+				publisherAmount = 0
 			}
 
 			//prep the query
@@ -257,7 +272,7 @@ func client(clientNumber int) {
 			receiveTweets(sharedSecret[clientNumber], leaderConn, false, clientNumber)
 
 			if len(archiveTopicList) > 0 {
-				wantsArchive[0] = 1
+				wantsArchive[0] = 0
 			} else {
 				wantsArchive[0] = 0
 			}
@@ -429,6 +444,9 @@ func receiveTweets(sharedSecret [2][32]byte, leaderConn net.Conn, getArchive boo
 	}
 
 	for i := 0; i < tmpNeededSubscriptions; i++ {
+		if !getArchive {
+			blocksReceived++
+		}
 		//client receives tweets
 		tweetsLengthBytes := readFrom(leaderConn, 4)
 		tweetsLength := byteToInt(tweetsLengthBytes)
@@ -468,7 +486,7 @@ func receiveTweets(sharedSecret [2][32]byte, leaderConn net.Conn, getArchive boo
 
 			}
 			ok := strings.Contains(text[0], text[1])
-			if ok {
+			if !ok {
 				goodPadding++
 				//fmt.Println("Round", round, text[0], "Length", len(tweets))
 				//fmt.Println("padding", text[1])
@@ -665,7 +683,7 @@ func getRealTweet(clientNumber int) []byte {
 		timestamp -= 1351742400
 		timestamp += startTime
 
-		if timestamp > lowerBound && timestamp < upperBound {
+		if float64(timestamp) > lowerBound && float64(timestamp) < upperBound {
 			lineArr = strings.Split(scanner.Text(), "[\"")
 			line := lineArr[1]
 			lineArr = strings.Split(line, "\"]")
@@ -718,14 +736,14 @@ func getRealTweet(clientNumber int) []byte {
 }
 
 func getTimeBounds() {
-	timeBounds = make([]int, 10000)
-	timeBounds[0] = int(time.Now().Unix())
+	timeBounds = make([]float64, 10000)
+	timeBounds[0] = float64(time.Now().Unix())
 
 	for index := range timeBounds {
 		if index == 0 {
 			continue
 		}
-		timeBounds[index] = timeBounds[index-1] + speedUp*(int(3*maxTimePerRound.Seconds())+2)
+		timeBounds[index] = timeBounds[index-1] + speedUp*(3*maxTimePerRound.Seconds()+2)
 	}
 }
 

+ 1 - 1
follower/follower.go

@@ -55,7 +55,7 @@ const mtu int = 1100
 var dbWriteSize int = 10000
 var neededSubscriptions int
 
-var round int = 0
+var round int
 var startTime time.Time
 
 func main() {

+ 62 - 58
leader/leader.go

@@ -17,12 +17,13 @@ import (
 	"crypto/x509/pkix"
 	"encoding/pem"
 	"fmt"
+	"log"
 	"math"
 	"math/big"
 	mr "math/rand"
 	"net"
+	"os"
 	"sort"
-	"strconv"
 	"strings"
 	"sync"
 	"time"
@@ -52,23 +53,25 @@ var leaderPrivateKey *[32]byte
 var leaderPublicKey *[32]byte
 var followerPublicKey *[32]byte
 
-const maxNumberOfClients = 10000000
+const maxNumberOfClients = 1000000
 
 var topicList []byte
 var topicAmount int
 var archiveTopicAmount int
 
 // every roundsBeforeUpdate the client updates his pirQuery
-const roundsBeforeUpdate = 3
-const neededSubscriptions = 4
+const roundsBeforeUpdate = 2
+const neededSubscriptions = 5
 const numThreads = 12
 const dataLength = 256
 const minDBWriteSize = 1000
 
 var dbWriteSize float64 = 10000
-var collisionCounter []float64
 
-var clientsConnected int
+//this is the number of positions for auditing
+var extraPositions int = 2
+var collisionCounter []float64
+var clientsConnected float64
 
 var maxTimePerRound time.Duration = 10 * time.Second
 
@@ -80,7 +83,7 @@ var startPhase2 time.Time
 var startPhase3 time.Time
 
 //counts the number of rounds
-var round int = 0
+var round int
 
 var startTime time.Time
 var startTimeRound time.Time
@@ -95,8 +98,30 @@ const publisherRounds int = 10
 var publisherAmount float64
 var publisherHistory [publisherRounds]int
 
+//only prints auditing time for one client per round
+var firstAuditPrint bool
+var auditingStart time.Time
+var auditingEnd time.Time
+
+var firstTweetSend bool
+
 func main() {
 
+	f, err := os.OpenFile("evalData", os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
+	if err != nil {
+		log.Fatalf("error opening file: %v", err)
+	}
+	defer f.Close()
+
+	log.SetOutput(f)
+	log.Println("roundsBeforeUpdate", roundsBeforeUpdate)
+	log.Println("neededSubscriptions", neededSubscriptions)
+	log.Println("dataLength", dataLength)
+	log.Println("maxTimePerRound~", maxTimePerRound*3)
+	log.Println("extraPositions", extraPositions)
+	log.Println("numClients", 500)
+	log.Println("Archiving is done every 24h, no Clients wants Archive")
+
 	//prevents race conditions for wrtiting
 	m := &sync.RWMutex{}
 
@@ -258,10 +283,6 @@ func main() {
 
 			phase1Channel <- clientConnection
 			clientsConnected++
-
-			if clientsConnected%1000 == 0 {
-				fmt.Println("clientsConnected", clientsConnected)
-			}
 		}
 	}()
 
@@ -282,7 +303,12 @@ func main() {
 		fmt.Println("clientsServedPhase3", clientsServedPhase3[round-1])
 		fmt.Println("dbWriteSize", dbWriteSize)
 
-		fmt.Println("Phase 1 Round", round)
+		log.Println()
+		log.Println("Phase 1 Round", round)
+		log.Println()
+		bytesSaved := lib.GetBytesSaved()
+		log.Println("bytesSaved Percentage", bytesSaved)
+		firstAuditPrint = true
 
 		//creates a new write Db for this round
 		for i := 0; i < int(dbWriteSize); i++ {
@@ -320,7 +346,9 @@ func main() {
 
 		wg.Wait()
 
-		fmt.Println("fullDurationPhase1", time.Since(startPhase1).Seconds())
+		//log.Println("fullDurationPhase1", time.Since(startPhase1).Seconds())
+		log.Println("fullAuditingDuration~", auditingEnd.Sub(auditingStart).Seconds()*clientsConnected)
+		log.Println("auditingPercentage", (auditingEnd.Sub(auditingStart).Seconds()*clientsConnected)/(time.Since(startPhase1).Seconds()))
 
 		//Phase 2
 
@@ -334,7 +362,7 @@ func main() {
 
 		phase2(followerConnection)
 
-		fmt.Println("fullDurationPhase2", time.Since(startPhase2).Seconds())
+		//log.Println("fullDurationPhase2", time.Since(startPhase2).Seconds())
 
 		//Phase 3
 
@@ -356,6 +384,8 @@ func main() {
 			continue
 		}
 
+		firstTweetSend = true
+
 		phase[0] = 3
 		startTimeRound = time.Now()
 
@@ -372,7 +402,7 @@ func main() {
 
 		wg.Wait()
 
-		fmt.Println("fullDurationPhase3", time.Since(startPhase3).Seconds())
+		log.Println("fullDurationPhase3", time.Since(startPhase3).Seconds())
 
 		lib.CleanUpdbR(round)
 	}
@@ -398,11 +428,6 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 	for clientConnection := range phase1Channel {
 		clientsServedPhase1[round] = clientsServedPhase1[round] + 1
 
-		if clientsServedPhase1[round]%1000 == 0 {
-			fmt.Println("clientsServedPhase1", clientsServedPhase1[round])
-			fmt.Println("timeTaken", time.Since(startPhase1))
-		}
-
 		gotClient[0] = 1
 		//tells follower that this worker got a clientConnection
 		writeTo(followerConnection, gotClient)
@@ -454,7 +479,9 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		}
 
 		//begin auditing
-		//auditingStart := time.Now()
+		if id == 0 && firstAuditPrint {
+			auditingStart = time.Now()
+		}
 		m.RLock()
 		var clientKeys = clientData[clientConnection.RemoteAddr()]
 		m.RUnlock()
@@ -477,8 +504,10 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 			}
 		}
 
-		if id == 0 {
-			//fmt.Println("Auditing duration", time.Since(auditingStart).Seconds())
+		if id == 0 && firstAuditPrint {
+			firstAuditPrint = false
+			auditingEnd = time.Now()
+			log.Println("Auditing duration", time.Since(auditingStart).Seconds(), "numVirtualAddresses", len(virtualAddresses))
 		}
 
 		m.Lock()
@@ -673,7 +702,7 @@ func phase2(followerConnection net.Conn) {
 	}
 
 	var tweets []lib.Tweet
-	var currentPublisherAmount int = 0
+	var currentPublisherAmount float64
 	var collisions float64
 	for i := 0; i < dbSize; i++ {
 		//discard cover message
@@ -730,23 +759,17 @@ func phase2(followerConnection net.Conn) {
 
 	collisionCounter = append(collisionCounter, collisions)
 
-	if collisions/dbWriteSize > 0.05 {
-		fmt.Println("Collision % this round", collisions/dbWriteSize, "dbWriteSize", dbWriteSize)
-	}
-
-	//fmt.Println("tweets recovered: ", tweets)
+	log.Println("Collision percentage this round", collisions/dbWriteSize, "dbWriteSize", dbWriteSize)
 
-	//sort into read db
-	//fmt.Println("newTweets", tweets)
 	lib.NewEntries(tweets, 0)
 
 	C.resetDb()
 
 	//calculates the publisherAverage over the last publisherRounds rounds
 	index := round % publisherRounds
-	publisherHistory[index] = currentPublisherAmount
+	publisherHistory[index] = int(currentPublisherAmount)
 
-	fmt.Println("currentPublisherAmount", currentPublisherAmount)
+	log.Println("currentPublisherAmount", currentPublisherAmount, "publisher percentage", currentPublisherAmount/clientsConnected)
 
 	var publisherAmount int
 	for _, num := range publisherHistory {
@@ -773,24 +796,6 @@ func phase2(followerConnection net.Conn) {
 	lib.CleanUpdbR(round)
 }
 
-func addTestTweets() {
-	//creates test tweets
-	tweets := make([]lib.Tweet, 5)
-
-	for i := range tweets {
-		j := i
-		if i == 1 {
-			j = 0
-		}
-		text := "Text " + strconv.Itoa(i)
-		var topics []string
-		topics = append(topics, "Topic "+strconv.Itoa(j))
-		tweets[i] = lib.Tweet{"", -1, topics, text, i}
-	}
-
-	lib.NewEntries(tweets, 0)
-}
-
 //opti! mb it is quicker to send updated topicLists to clients first so pirQuerys are ready
 func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGroup, m *sync.RWMutex) {
 
@@ -1026,8 +1031,8 @@ func handleClientDC(wg *sync.WaitGroup, followerConnection net.Conn, channel cha
 func createVirtualAddresses() []int {
 	//array will be filled with unique random ascending values
 	//adapted from: https://stackoverflow.com/questions/20039025/java-array-of-unique-randomly-generated-integers
-	//+1 to have a position to evaluate each received message
-	arraySize := int(dbWriteSize) + 1
+	//+extraPositions to have a position to evaluate each received message
+	arraySize := int(dbWriteSize) + extraPositions
 	var maxInt int = int(math.Pow(2, 31))
 	virtualAddresses := make([]int, arraySize)
 	for i := 0; i < arraySize; i++ {
@@ -1085,7 +1090,6 @@ func getSendTweets(clientKeys clientKeys, archiveQuerys [][]byte, clientConnecti
 			tmpNeededSubscriptions = archiveTopicAmount
 		}
 	}
-	//fmt.Println("tmpNeededSubscriptions", tmpNeededSubscriptions)
 
 	tweets := make([][]byte, tmpNeededSubscriptions)
 	for i := 0; i < tmpNeededSubscriptions; i++ {
@@ -1106,20 +1110,20 @@ func getSendTweets(clientKeys clientKeys, archiveQuerys [][]byte, clientConnecti
 		//Xor's sharedSecret with all tweets
 		lib.Xor(expandedSharedSecret[:], tweets[i])
 
-		//fmt.Println(tweets[0])
-
 		blockLength := len(tweets[i])
 
 		receivedTweets, _ := readFrom(followerConnection, blockLength, nil, 0)
 
-		//fmt.Println("receivedTweets", blockLength, len(receivedTweets))
-		//fmt.Println("pubKey", *clientKeys.PublicKey, "Bytes", tweets, "follower", receivedTweets)
 		lib.Xor(receivedTweets, tweets[i])
 	}
 
 	//sends tweets to client
 	for i := 0; i < tmpNeededSubscriptions; i++ {
 		tweetsLengthBytes := intToByte(len(tweets[i]))
+		if firstTweetSend && archiveQuerys == nil {
+			firstTweetSend = false
+			log.Println("sending", len(tweets[0]), "bytes of data")
+		}
 		errorBool := writeToWError(clientConnection, tweetsLengthBytes, followerConnection, 2)
 		if errorBool {
 			return true

+ 12 - 1
lib/databaseRead.go

@@ -24,7 +24,9 @@ var archive = make(map[string][]Tweet)
 var minimumBlockSize int
 
 //needs to be dividable by roundsBeforUpdate
-var roundsBeforeArchiving = -1
+var roundsBeforeArchiving = 12
+
+var bytesSaved float64
 
 var topicList []string
 
@@ -54,6 +56,9 @@ func NewEntries(inputTweets []Tweet, whereTo int) {
 			} else {
 				//known tweet
 				//setting pointer for all other Topics
+				if whereTo == 1 {
+					bytesSaved += float64(len(tweet.Topics)) + float64(len(tweet.Text))
+				}
 				topic := tweet.Topics[index]
 				var pointerTweet Tweet
 				pointerTweet.TopicPointer = tweet.Topics[0]
@@ -125,6 +130,9 @@ func getNamesForTopics(wantedIndices []byte, whereFrom int) []string {
 		tmpTopicList = archiveTopicList
 	}
 	for index, element := range wantedIndices {
+		if index == len(tmpTopicList) {
+			break
+		}
 		if element == 1 {
 			topicNames = append(topicNames, tmpTopicList[index])
 		}
@@ -302,5 +310,8 @@ func CleanUpdbR(round int) {
 
 		NewEntries(tweetsToMain, 0)
 	}
+}
 
+func GetBytesSaved() float64 {
+	return bytesSaved / float64(len(archive))
 }