Prechádzať zdrojové kódy

bugfixes, preparation for eval/realTweets

Simon 2 rokov pred
rodič
commit
de446fa5a1
5 zmenil súbory, kde vykonal 198 pridanie a 43 odobranie
  1. 2 1
      .gitignore
  2. 107 8
      client/client.go
  3. 5 5
      follower/follower.go
  4. 82 27
      leader/leader.go
  5. 2 2
      lib/databaseRead.go

+ 2 - 1
.gitignore

@@ -1 +1,2 @@
-testing/
+testing/
+tweets/

+ 107 - 8
client/client.go

@@ -13,6 +13,7 @@ import "C"
 //sssssssssssss
 import (
 	"2PPS/lib"
+	"bufio"
 	"bytes"
 	"crypto/rand"
 	"crypto/sha256"
@@ -22,6 +23,7 @@ import (
 	"math/big"
 	mr "math/rand"
 	"net"
+	"os"
 	"sort"
 	"strconv"
 	"strings"
@@ -40,8 +42,8 @@ type tweet struct {
 const leader string = "127.0.0.1:4441"
 
 //needs to be changed at leader/follower/client at the same time
-const numClients = 200
-const dataLength int = 128
+const numClients = 4000
+const dataLength int = 256
 const numThreads int = 12
 
 //Maximum Transport Unit
@@ -53,6 +55,9 @@ var topicList []string
 var archiveTopicList []string
 var neededSubscriptions int
 
+var maxTimePerRound time.Duration = 100 * time.Second
+var startTime int
+
 //todo! expand this for multiple clients
 var archiveInterests = make([]int, 1)
 var sharedSecret [numClients][2][32]byte = createSharedSecret()
@@ -70,6 +75,7 @@ func main() {
 	for i := 0; i < numClients; i++ {
 		wg.Add(1)
 		go client(i)
+		time.Sleep(10 * time.Millisecond)
 	}
 	wg.Wait()
 }
@@ -120,6 +126,9 @@ func client(clientNumber int) {
 	neededSubscriptionsBytes := readFrom(leaderConn, 4)
 	neededSubscriptions = byteToInt(neededSubscriptionsBytes)
 
+	startTimeBytes := readFrom(leaderConn, 4)
+	startTime = byteToInt(startTimeBytes)
+
 	//setup ends above
 	//while client is active he is always connected and has to participate
 
@@ -127,8 +136,6 @@ func client(clientNumber int) {
 		//gets current phase
 		phase := readFrom(leaderConn, 1)
 
-		//fmt.Println("Phase ", phase[0])
-
 		if phase[0] == 1 {
 
 			//gets current dbWriteSize from leader
@@ -145,12 +152,16 @@ func client(clientNumber int) {
 
 			round = byteToInt(roundAsBytes)
 
+			if clientNumber == 0 {
+				fmt.Println("Round ", round)
+			}
+
 			//request virtualAddress from leader via pirQuery
 			encryptedQueryLeader, encryptedQueryFollower := createAuditPIRQuery(clientNumber)
 			sendQuerys(encryptedQueryLeader, encryptedQueryFollower, leaderConn, false)
 			pos := receiveVirtualAddress(sharedSecret[clientNumber], leaderConn)
 
-			tweet := getTweet(clientNumber)
+			tweet := getRandomTweet(clientNumber)
 
 			//prep the query
 			dataSize := len(tweet)
@@ -435,7 +446,7 @@ func receiveTweets(sharedSecret [2][32]byte, leaderConn net.Conn, getArchive boo
 
 		index := strings.Index(string(tweets), ";;;")
 		if index != -1 {
-			fmt.Println("Round", round, "Tweets length", len(tweets))
+			//fmt.Println("Round", round, "Tweets length", len(tweets))
 			/*
 				fmt.Println("Correct")
 				fmt.Println("PubKey", clientPublicKey[clientNumber])
@@ -582,7 +593,92 @@ func receiveTopicLists(leaderConn net.Conn) {
 	}
 }
 
-func getTweet(clientNumber int) []byte {
+func getRealTweet(clientNumber int) []byte {
+
+	fUserList, err := os.Create("tweets/userList")
+	if err != nil {
+		panic(err)
+	}
+
+	defer fUserList.Close()
+
+	currentLine := 0
+
+	scanner := bufio.NewScanner(fUserList)
+	userID := ""
+
+	for scanner.Scan() {
+		if currentLine == clientNumber {
+			userID = scanner.Text()
+			break
+		}
+		currentLine++
+	}
+
+	if userID == "" {
+		panic("no userID picked")
+	}
+
+	fTweets, err := os.Open("tweets/userTweets/" + userID)
+	if err != nil {
+		panic(err)
+	}
+
+	defer fTweets.Close()
+
+	scanner = bufio.NewScanner(fTweets)
+
+	lowerBound := time.Now().Second()
+	upperBound := lowerBound + int(3*maxTimePerRound.Seconds()) + 10
+
+	var tweet []byte
+
+	for scanner.Scan() {
+		lineArr := strings.Split(scanner.Text(), ", \"hashtags\"")
+		lineArr = strings.Split(lineArr[0], ": ")
+		timestamp, _ := strconv.Atoi(lineArr[len(lineArr)])
+
+		//transforms timestamp to current time
+		timestamp -= 1351742400
+		timestamp += startTime
+
+		if timestamp > lowerBound && timestamp < upperBound {
+			lineArr = strings.Split(scanner.Text(), "[\"")
+			line := lineArr[1]
+			lineArr = strings.Split(line, "\"]")
+			line = lineArr[0]
+			lineArr = strings.Split(line, ",")
+			var topics []byte
+			for _, topic := range lineArr {
+				topicLine := strings.Split(topic, "\"")
+				topics = append(topics, []byte(topicLine[1])[:]...)
+				topics = append(topics, []byte(",")[0])
+			}
+
+			topics = topics[:len(topics)-1]
+
+			tweet = append(tweet, topics...)
+			tweet = append(tweet, []byte(";")[0])
+
+			r := mr.New(mr.NewSource(time.Now().UnixNano()))
+			num := r.Intn(10000)
+			if num == 0 {
+				num = 1
+			}
+			tweet = append(tweet, []byte(strconv.Itoa(num) + ";;")[:]...)
+
+		}
+	}
+	//adds padding
+	length := dataLength - len(tweet)
+	padding := make([]byte, length)
+	rand.Read(padding)
+	tweet = append(tweet, padding...)
+
+	return tweet
+}
+
+func getRandomTweet(clientNumber int) []byte {
 	var tweet []byte
 
 	r := mr.New(mr.NewSource(time.Now().UnixNano()))
@@ -627,13 +723,16 @@ func getTweet(clientNumber int) []byte {
 	}
 
 	text := []byte(strconv.Itoa(num) + ";")
-
 	tweet = append(tweet, topics...)
 	tweet = append(tweet, text...)
 	tweet = append(tweet, []byte(";")[0])
 
 	//fmt.Println("writing", string(text))
+	//fmt.Println(topicNumbers)
 
+	if len(tweet) > 32 {
+		fmt.Println("lenlen", len(tweet))
+	}
 	//adds padding
 	length := dataLength - len(tweet)
 	padding := make([]byte, length)

+ 5 - 5
follower/follower.go

@@ -47,17 +47,15 @@ var followerPublicKey *[32]byte
 var leaderPublicKey *[32]byte
 
 //needs to be changed at leader/follower/client at the same time
-const dataLength = 128
+const dataLength = 64
 const numThreads = 12
 
 //Maximum Transport Unit
 const mtu int = 1100
 
-var dbWriteSize int = 100
+var dbWriteSize int = 20000
 var neededSubscriptions int
 
-var maxTimePerRound time.Duration = 5 * time.Second
-
 var round int = 0
 var startTime time.Time
 
@@ -435,7 +433,9 @@ func phase2(leaderWorkerConnection net.Conn) {
 
 			tweet := lib.Tweet{"", -1, topics, text, round}
 
-			tweets = append(tweets, tweet)
+			if text != "" {
+				tweets = append(tweets, tweet)
+			}
 		}
 	}
 

+ 82 - 27
leader/leader.go

@@ -61,19 +61,23 @@ var topicAmount int
 var archiveTopicAmount int
 
 // every roundsBeforeUpdate the client updates his pirQuery
-const roundsBeforeUpdate = 1
+const roundsBeforeUpdate = 5
 const neededSubscriptions = 1
 const numThreads = 12
-const dataLength = 128
+const dataLength = 64
+const minDBWriteSize = 1000
 
-var dbWriteSize int = 100
+var dbWriteSize float64 = 20000
+var collisionCounter []float64
 
-const minDBWriteSize int = 10
+var maxTimePerRound time.Duration = 100 * time.Second
 
-var maxTimePerRound time.Duration = 5 * time.Second
+var clientsServedPhase1 []int
+var clientsServedPhase3 []int
 
-var clientsServedPhase1 int = 0
-var clientsServedPhase3 int = 0
+var startPhase1 time.Time
+var startPhase2 time.Time
+var startPhase3 time.Time
 
 //counts the number of rounds
 var round int = 0
@@ -85,7 +89,7 @@ var phase1Channel = make(chan net.Conn, maxNumberOfClients)
 var phase3Channel = make(chan net.Conn, maxNumberOfClients)
 
 //variables for calculating the dbWrite size
-const publisherRounds int = 3
+const publisherRounds int = 10
 
 var publisherAmount float64
 var publisherHistory [publisherRounds]int
@@ -96,6 +100,9 @@ func main() {
 	//prevents race conditions for wrtiting
 	m := &sync.RWMutex{}
 
+	clientsServedPhase1 := make([]int, 1000)
+	clientsServedPhase3 := make([]int, 1000)
+
 	generatedPublicKey, generatedPrivateKey, err := box.GenerateKey(rand.Reader)
 	if err != nil {
 		panic(err)
@@ -224,6 +231,13 @@ func main() {
 				break
 			}
 
+			_, err = clientConnection.Write(intToByte(startTime.Second()))
+			if err != nil {
+				fmt.Println("error", err)
+				clientConnection.Close()
+				break
+			}
+
 			clientPublicKey = &tmpClientPublicKey
 
 			//this is the key for map(client data)
@@ -248,30 +262,28 @@ func main() {
 	phase := make([]byte, 1)
 
 	for {
-
 		startTime = time.Now()
+		startPhase1 = time.Now()
+
 		phase[0] = 1
 
 		round++
 
-		fmt.Println("clientsServedPhase1", clientsServedPhase1)
-		fmt.Println("clientsServedPhase3", clientsServedPhase3)
+		fmt.Println("clientsServedPhase1", clientsServedPhase1[round-1])
+		fmt.Println("clientsServedPhase3", clientsServedPhase3[round-1])
 		fmt.Println("dbWriteSize", dbWriteSize)
 
-		clientsServedPhase1 = 0
-		clientsServedPhase3 = 0
-
 		fmt.Println("Phase 1 Round", round)
 
 		//creates a new write Db for this round
-		for i := 0; i < dbWriteSize; i++ {
+		for i := 0; i < int(dbWriteSize); i++ {
 			C.createDb(C.int(1), C.int(dataLength))
 		}
 
 		//creates a new db containing virtual addresses for auditing
 		virtualAddresses := createVirtualAddresses()
 		//send all virtualAddresses to follower
-		for i := 0; i <= dbWriteSize; i++ {
+		for i := 0; i <= int(dbWriteSize); i++ {
 			writeTo(followerConnection, intToByte(virtualAddresses[i]))
 		}
 
@@ -289,8 +301,12 @@ func main() {
 
 		wg.Wait()
 
+		fmt.Println("fullDurationPhase1", time.Since(startPhase1).Seconds())
+
 		//Phase 2
 
+		startPhase2 = time.Now()
+
 		followerConnection, err := tls.Dial("tcp", follower, conf)
 		if err != nil {
 			panic(err)
@@ -299,8 +315,12 @@ func main() {
 
 		phase2(followerConnection)
 
+		fmt.Println("fullDurationPhase2", time.Since(startPhase2).Seconds())
+
 		//Phase 3
 
+		startPhase3 = time.Now()
+
 		//no tweets -> continue to phase 1 and mb get tweets
 		topicList, topicAmount = lib.GetTopicList(0)
 		if len(topicList) == 0 {
@@ -323,6 +343,8 @@ func main() {
 
 		wg.Wait()
 
+		fmt.Println("fullDurationPhase3", time.Since(startPhase3).Seconds())
+
 		lib.CleanUpdbR(round)
 	}
 }
@@ -345,7 +367,12 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 	}
 
 	for clientConnection := range phase1Channel {
-		clientsServedPhase1++
+		clientsServedPhase1[round] = clientsServedPhase1[round] + 1
+
+		if clientsServedPhase1[round]%1000 == 0 {
+			fmt.Println("clientsServedPhase1", clientsServedPhase1[round])
+			fmt.Println("timeTaken", time.Since(startPhase1))
+		}
 
 		gotClient[0] = 1
 		//tells follower that this worker got a clientConnection
@@ -376,7 +403,7 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		}
 
 		//tells client current dbWriteSize
-		errorBool = writeToWError(clientConnection, intToByte(dbWriteSize), followerConnection, 5)
+		errorBool = writeToWError(clientConnection, intToByte(int(dbWriteSize)), followerConnection, 5)
 		if errorBool {
 			contBool := handleClientDC(wg, followerConnection, phase1Channel)
 			if contBool {
@@ -397,6 +424,8 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 			}
 		}
 
+		//begin auditing
+		auditingStart := time.Now()
 		m.RLock()
 		var clientKeys = clientData[clientConnection.RemoteAddr()]
 		m.RUnlock()
@@ -419,6 +448,10 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 			}
 		}
 
+		if id == 0 {
+			fmt.Println("Auditing duration", time.Since(auditingStart))
+		}
+
 		m.Lock()
 		clientData[clientConnection.RemoteAddr()] = clientKeys
 		m.Unlock()
@@ -470,7 +503,7 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 
 		ds := int(C.db[0].dataSize)
 		dataShareLeader := make([]byte, ds)
-		pos := C.getUint128_t(C.int(virtualAddresses[dbWriteSize]))
+		pos := C.getUint128_t(C.int(virtualAddresses[int(dbWriteSize)]))
 		C.evalDPF(C.ctx[id], (*C.uchar)(&dpfQueryA[0]), pos, C.int(ds), (*C.uchar)(&dataShareLeader[0]))
 
 		dataShareFollower, _ := readFrom(followerConnection, ds, nil, 0)
@@ -612,11 +645,16 @@ func phase2(followerConnection net.Conn) {
 
 	var tweets []lib.Tweet
 	var currentPublisherAmount int = 0
+	var collisions float64
 	for i := 0; i < dbSize; i++ {
 		//discard cover message
 		if tmpdb[i][1] == 0 {
 			continue
+			//collision
 		} else if -1 == strings.Index(string(tmpdb[i]), ";;") {
+			currentPublisherAmount++
+			currentPublisherAmount++
+			collisions++
 			continue
 		} else {
 			currentPublisherAmount++
@@ -649,13 +687,25 @@ func phase2(followerConnection net.Conn) {
 
 			tweet := lib.Tweet{"", -1, topics, text, round}
 
-			tweets = append(tweets, tweet)
+			if text != "" {
+				tweets = append(tweets, tweet)
+			} else {
+				//this is a odd(number) way collisions
+				collisions++
+			}
 		}
 	}
 
+	collisionCounter = append(collisionCounter, collisions)
+
+	if collisions/dbWriteSize > 0.05 {
+		fmt.Println("Collisions this round", collisions, "dbWriteSize", dbWriteSize)
+	}
+
 	//fmt.Println("tweets recovered: ", tweets)
 
 	//sort into read db
+	//fmt.Println("newTweets", tweets)
 	lib.NewEntries(tweets, 0)
 
 	C.resetDb()
@@ -677,14 +727,14 @@ func phase2(followerConnection net.Conn) {
 	}
 
 	//calculates the dbWriteSize for this round
-	dbWriteSize = int(math.Ceil(19.5 * float64(publisherAverage)))
+	dbWriteSize = math.Ceil(19.5 * float64(publisherAverage))
 
 	if dbWriteSize < minDBWriteSize {
 		dbWriteSize = minDBWriteSize
 	}
 
 	//writes dbWriteSize of current round to follower
-	writeTo(followerConnection, intToByte(dbWriteSize))
+	writeTo(followerConnection, intToByte(int(dbWriteSize)))
 }
 
 func addTestTweets() {
@@ -713,7 +763,7 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 
 	//wait until time is up
 	for len(phase3Channel) == 0 {
-		if time.Since(startTime) > maxTimePerRound {
+		if time.Since(startTime) > maxTimePerRound*2 {
 			//tells follower that this worker is done
 			writeToWError(followerConnection, gotClient, nil, 0)
 			wg.Done()
@@ -723,7 +773,12 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 	}
 
 	for clientConnection := range phase3Channel {
-		clientsServedPhase3++
+		clientsServedPhase3[round] = clientsServedPhase3[round] + 1
+
+		if clientsServedPhase3[round]%1000 == 0 {
+			fmt.Println("clientsServedPhase3", clientsServedPhase3[round])
+			fmt.Println("timeTaken", time.Since(startPhase3).Seconds())
+		}
 
 		gotClient[0] = 1
 		//tells follower that this worker got a clientConnection
@@ -886,7 +941,7 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		phase1Channel <- clientConnection
 
 		for {
-			if time.Since(startTime) < maxTimePerRound {
+			if time.Since(startTime) < 2*maxTimePerRound {
 				//this worker handles the next client
 				if len(phase3Channel) > 0 {
 					break
@@ -936,7 +991,7 @@ func createVirtualAddresses() []int {
 	//array will be filled with unique random ascending values
 	//adapted from: https://stackoverflow.com/questions/20039025/java-array-of-unique-randomly-generated-integers
 	//+1 to have a position to evaluate each received message
-	arraySize := dbWriteSize + 1
+	arraySize := int(dbWriteSize) + 1
 	var maxInt int = int(math.Pow(2, 31))
 	virtualAddresses := make([]int, arraySize)
 	for i := 0; i < arraySize; i++ {
@@ -1083,7 +1138,7 @@ func handlePirQuery(clientKeys clientKeys, clientConnection net.Conn, followerCo
 
 	if doAuditing {
 		tmpNeededSubscriptions = 1
-		tmpTopicAmount = dbWriteSize
+		tmpTopicAmount = int(dbWriteSize)
 	}
 
 	//send length to follower

+ 2 - 2
lib/databaseRead.go

@@ -71,7 +71,8 @@ func NewEntries(inputTweets []Tweet, whereTo int) {
 
 //todo! add round to pirquery only get tweets that have been posted from that round onward
 func GetTweets(pirQuery []byte, dataLength int, whereFrom int, pubKey [32]byte) []byte {
-	//fmt.Println(pirQuery, pubKey)
+	//fmt.Println("query", pirQuery)
+	//fmt.Println("dbR", dbR)
 	tmpdb := dbR
 	if whereFrom == 1 {
 		tmpdb = archive
@@ -89,7 +90,6 @@ func GetTweets(pirQuery []byte, dataLength int, whereFrom int, pubKey [32]byte)
 			} else {
 				//"copied" tweet
 				//find tweet with pointers
-				//fmt.Println(tweet)
 				tweet = tmpdb[tweet.TopicPointer][tweet.TextPointer]
 				tweet.RoundPosted = 0
 				tweetsToReturn[index] = append(tweetsToReturn[index], tweet)