Sfoglia il codice sorgente

works with real tweets

Simon 2 anni fa
parent
commit
0c6cc9a314
4 ha cambiato i file con 90 aggiunte e 67 eliminazioni
  1. 35 21
      client/client.go
  2. 2 2
      follower/follower.go
  3. 48 44
      leader/leader.go
  4. 5 0
      lib/databaseRead.go

+ 35 - 21
client/client.go

@@ -42,11 +42,11 @@ type tweet struct {
 const leader string = "127.0.0.1:4441"
 
 //needs to be changed at leader/follower/client at the same time
-const numClients = 10000
+const numClients = 2000
 
 //mylimit=8000
 //sudo prlimit --nofile=$mylimit --pid $$; ulimit -n $mylimit
-const dataLength int = 64
+const dataLength int = 256
 const numThreads int = 12
 
 //Maximum Transport Unit
@@ -58,7 +58,7 @@ var topicList []string
 var archiveTopicList []string
 var neededSubscriptions int
 
-var maxTimePerRound time.Duration = 10 * time.Second
+var maxTimePerRound time.Duration = 30 * time.Second
 var startTime int
 
 //todo! expand this for multiple clients
@@ -448,13 +448,14 @@ func receiveTweets(sharedSecret [2][32]byte, leaderConn net.Conn, getArchive boo
 
 		//fmt.Println("PubKey", clientPublicKey[clientNumber], "Bytes", tweets)
 
-		index := strings.Index(string(tweets), ";;;")
+		index := strings.Index(string(tweets), ";;")
 		if index != -1 && clientNumber == 0 {
 			//fmt.Println("Round", round, "Tweets length", len(tweets))
 			fmt.Println("Correct")
-			text := string(tweets)[:index]
+			textArr := strings.Split(string(tweets), ";;;;;;;;")
+			text := string(tweets)[:len(textArr)-1]
 			fmt.Println("Round", round, "Text", text[:5], "Length", len(tweets))
-		} else {
+		} else if index == -1 {
 			fmt.Println("error")
 			fmt.Println("round", round, "text:", string(tweets), "length", len(tweets))
 			return
@@ -608,7 +609,7 @@ func getRealTweet(clientNumber int) []byte {
 	userID := ""
 
 	for scanner.Scan() {
-		if currentLine-1 == clientNumber {
+		if currentLine == clientNumber {
 			userID = scanner.Text()
 			break
 		}
@@ -628,15 +629,18 @@ func getRealTweet(clientNumber int) []byte {
 
 	scanner = bufio.NewScanner(fTweets)
 
-	lowerBound := time.Now().Second()
+	lowerBound := int(time.Now().Unix())
 	upperBound := lowerBound + int(3*maxTimePerRound.Seconds()) + 10
 
 	var tweet []byte
 
 	for scanner.Scan() {
+
 		lineArr := strings.Split(scanner.Text(), ", \"hashtags\"")
 		lineArr = strings.Split(lineArr[0], ": ")
-		timestamp, _ := strconv.Atoi(lineArr[len(lineArr)-1])
+		lineArr = strings.Split(lineArr[1], " \"")
+
+		timestamp, _ := strconv.Atoi(lineArr[0])
 
 		//transforms timestamp to current time
 		timestamp -= 1351742400
@@ -648,11 +652,20 @@ func getRealTweet(clientNumber int) []byte {
 			lineArr = strings.Split(line, "\"]")
 			line = lineArr[0]
 			lineArr = strings.Split(line, ",")
+			line = strings.Join(lineArr, "")
+			topicLine := strings.Split(line, "\"")
+
 			var topics []byte
-			for _, topic := range lineArr {
-				topicLine := strings.Split(topic, "\"")
-				topics = append(topics, []byte(topicLine[1])[:]...)
+			for index, topic := range topicLine {
+				if index%2 == 1 {
+					continue
+				}
+				if len(topics)+len(topic) > dataLength-10 {
+					break
+				}
+				topics = append(topics, []byte(topic)[:]...)
 				topics = append(topics, []byte(",")[0])
+
 			}
 
 			topics = topics[:len(topics)-1]
@@ -666,18 +679,19 @@ func getRealTweet(clientNumber int) []byte {
 				num = 1
 			}
 			tweet = append(tweet, []byte(strconv.Itoa(num) + ";;")[:]...)
-			break
+			//fmt.Println("tweet", string(tweet))
+
+			//adds padding
+			length := dataLength - len(tweet)
+			padding := make([]byte, length)
+			rand.Read(padding)
+			tweet = append(tweet, padding...)
+
+			return tweet
 		}
 	}
 
-	//todo! doesnt work
-	fmt.Println("tweet", string(tweet))
-
-	//adds padding
-	length := dataLength - len(tweet)
-	padding := make([]byte, length)
-	rand.Read(padding)
-	tweet = append(tweet, padding...)
+	tweet = make([]byte, dataLength)
 
 	return tweet
 }

+ 2 - 2
follower/follower.go

@@ -47,7 +47,7 @@ var followerPublicKey *[32]byte
 var leaderPublicKey *[32]byte
 
 //needs to be changed at leader/follower/client at the same time
-const dataLength = 64
+const dataLength = 256
 const numThreads = 12
 
 //Maximum Transport Unit
@@ -423,7 +423,7 @@ func phase2(leaderWorkerConnection net.Conn) {
 							topic = ""
 						} else {
 							//change this works for ints, need to be changed for strings
-							topic = topic + fmt.Sprint((int(letter)))
+							topic = topic + string(letter)
 						}
 					} else if position == 1 {
 						text = text + string(letter)

+ 48 - 44
leader/leader.go

@@ -64,7 +64,7 @@ var archiveTopicAmount int
 const roundsBeforeUpdate = 5
 const neededSubscriptions = 1
 const numThreads = 12
-const dataLength = 64
+const dataLength = 256
 const minDBWriteSize = 1000
 
 var dbWriteSize float64 = 20000
@@ -72,7 +72,7 @@ var collisionCounter []float64
 
 var clientsConnected int
 
-var maxTimePerRound time.Duration = 10 * time.Second
+var maxTimePerRound time.Duration = 30 * time.Second
 
 var clientsServedPhase1 []int
 var clientsServedPhase3 []int
@@ -85,11 +85,12 @@ var startPhase3 time.Time
 var round int = 0
 
 var startTime time.Time
+var startTimeRound time.Time
 
 //channel for goroutine communication with clients
-var waiCthannel = make(chan net.Conn, maxNumberOfClients)
+var waitChannel = make(chan net.Conn, maxNumberOfClients)
 
-//var waiCthannel = make(chan net.Conn, maxNumberOfClients)
+//var waitChannel = make(chan net.Conn, maxNumberOfClients)
 
 //variables for calculating the dbWrite size
 const publisherRounds int = 10
@@ -103,6 +104,8 @@ func main() {
 	//prevents race conditions for wrtiting
 	m := &sync.RWMutex{}
 
+	startTime = time.Now()
+
 	clientsServedPhase1 = make([]int, 1000)
 	clientsServedPhase3 = make([]int, 1000)
 
@@ -234,7 +237,7 @@ func main() {
 				break
 			}
 
-			_, err = clientConnection.Write(intToByte(startTime.Second()))
+			_, err = clientConnection.Write(intToByte(int(startTime.Unix())))
 			if err != nil {
 				fmt.Println("error", err)
 				clientConnection.Close()
@@ -255,7 +258,7 @@ func main() {
 			clientData[remoteAddress] = keys
 			m.Unlock()
 
-			waiCthannel <- clientConnection
+			waitChannel <- clientConnection
 			clientsConnected++
 
 			if clientsConnected%1000 == 0 {
@@ -270,8 +273,8 @@ func main() {
 	phase := make([]byte, 1)
 
 	for {
-		startTime = time.Now()
 		startPhase1 = time.Now()
+		startTimeRound = time.Now()
 
 		phase[0] = 1
 
@@ -304,7 +307,7 @@ func main() {
 			}
 			followerConnection.SetDeadline(time.Time{})
 
-			go phase1(id, phase, followerConnection, wg, m, startTime, virtualAddresses)
+			go phase1(id, phase, followerConnection, wg, m, virtualAddresses)
 		}
 
 		wg.Wait()
@@ -336,7 +339,7 @@ func main() {
 		}
 
 		phase[0] = 3
-		startTime = time.Now()
+		startTimeRound = time.Now()
 
 		for id := 0; id < numThreads; id++ {
 			wg.Add(1)
@@ -346,7 +349,7 @@ func main() {
 			}
 			followerConnection.SetDeadline(time.Time{})
 
-			go phase3(id, phase, followerConnection, wg, startTime, m)
+			go phase3(id, phase, followerConnection, wg, m)
 		}
 
 		wg.Wait()
@@ -357,15 +360,15 @@ func main() {
 	}
 }
 
-func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGroup, m *sync.RWMutex, startTime time.Time, virtualAddresses []int) {
+func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGroup, m *sync.RWMutex, virtualAddresses []int) {
 
 	roundAsBytes := intToByte(round)
 	gotClient := make([]byte, 1)
 	gotClient[0] = 0
 
 	//wait until time is up
-	for len(waiCthannel) == 0 {
-		if time.Since(startTime) > maxTimePerRound {
+	for len(waitChannel) == 0 {
+		if time.Since(startTimeRound) > maxTimePerRound {
 			//tells follower that this worker is done
 			writeTo(followerConnection, gotClient)
 			wg.Done()
@@ -374,7 +377,7 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		time.Sleep(1 * time.Second)
 	}
 
-	for clientConnection := range waiCthannel {
+	for clientConnection := range waitChannel {
 		clientsServedPhase1[round] = clientsServedPhase1[round] + 1
 
 		if clientsServedPhase1[round]%1000 == 0 {
@@ -402,7 +405,7 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		//tells client that phase 1 has begun
 		errorBool := writeToWError(clientConnection, phase, followerConnection, 5)
 		if errorBool {
-			contBool := handleClientDC(wg, followerConnection, waiCthannel)
+			contBool := handleClientDC(wg, followerConnection, waitChannel)
 			if contBool {
 				continue
 			} else {
@@ -413,7 +416,7 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		//tells client current dbWriteSize
 		errorBool = writeToWError(clientConnection, intToByte(int(dbWriteSize)), followerConnection, 5)
 		if errorBool {
-			contBool := handleClientDC(wg, followerConnection, waiCthannel)
+			contBool := handleClientDC(wg, followerConnection, waitChannel)
 			if contBool {
 				continue
 			} else {
@@ -424,7 +427,7 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		//tells client current round
 		errorBool = writeToWError(clientConnection, roundAsBytes, followerConnection, 5)
 		if errorBool {
-			contBool := handleClientDC(wg, followerConnection, waiCthannel)
+			contBool := handleClientDC(wg, followerConnection, waitChannel)
 			if contBool {
 				continue
 			} else {
@@ -439,7 +442,7 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		m.RUnlock()
 		clientKeys, pirQuery, errorBool := handlePirQuery(clientKeys, clientConnection, followerConnection, 0, true)
 		if errorBool {
-			contBool := handleClientDC(wg, followerConnection, waiCthannel)
+			contBool := handleClientDC(wg, followerConnection, waitChannel)
 			if contBool {
 				continue
 			} else {
@@ -448,7 +451,7 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		}
 		errorBool = getSendVirtualAddress(pirQuery[0], virtualAddresses, clientKeys.SharedSecret, clientConnection, followerConnection)
 		if errorBool {
-			contBool := handleClientDC(wg, followerConnection, waiCthannel)
+			contBool := handleClientDC(wg, followerConnection, waitChannel)
 			if contBool {
 				continue
 			} else {
@@ -467,7 +470,7 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		//accept dpfQuery from client
 		dpfLengthBytes, errorBool := readFrom(clientConnection, 4, followerConnection, 5)
 		if errorBool {
-			contBool := handleClientDC(wg, followerConnection, waiCthannel)
+			contBool := handleClientDC(wg, followerConnection, waitChannel)
 			if contBool {
 				continue
 			} else {
@@ -479,7 +482,7 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 
 		dpfQueryAEncrypted, errorBool := readFrom(clientConnection, dpfLength, followerConnection, 5)
 		if errorBool {
-			contBool := handleClientDC(wg, followerConnection, waiCthannel)
+			contBool := handleClientDC(wg, followerConnection, waitChannel)
 			if contBool {
 				continue
 			} else {
@@ -489,7 +492,7 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 
 		dpfQueryBEncrypted, errorBool := readFrom(clientConnection, dpfLength, followerConnection, 5)
 		if errorBool {
-			contBool := handleClientDC(wg, followerConnection, waiCthannel)
+			contBool := handleClientDC(wg, followerConnection, waitChannel)
 			if contBool {
 				continue
 			} else {
@@ -548,14 +551,14 @@ func phase1(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 				C.xorIn(C.int(i), (*C.uchar)(&db[i][0]))
 				m.Unlock()
 			}
-			waiCthannel <- clientConnection
+			waitChannel <- clientConnection
 		}
 
 		//loop that waits for new client or leaves phase1 if time is up
 		for {
-			if time.Since(startTime) < maxTimePerRound {
+			if time.Since(startTimeRound) < maxTimePerRound {
 				//this worker handles the next client
-				if len(waiCthannel) > 0 {
+				if len(waitChannel) > 0 {
 					break
 					//this worker waits for next client
 				} else {
@@ -685,7 +688,8 @@ func phase2(followerConnection net.Conn) {
 							topic = ""
 						} else {
 							//change this works for ints, need to be changed for strings
-							topic = topic + fmt.Sprint((int(letter)))
+							//have
+							topic = topic + string(letter)
 						}
 					} else if position == 1 {
 						text = text + string(letter)
@@ -764,14 +768,14 @@ func addTestTweets() {
 }
 
 //opti! mb it is quicker to send updated topicLists to clients first so pirQuerys are ready
-func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGroup, startTime time.Time, m *sync.RWMutex) {
+func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGroup, m *sync.RWMutex) {
 
 	gotClient := make([]byte, 1)
 	gotClient[0] = 0
 
 	//wait until time is up
-	for len(waiCthannel) == 0 {
-		if time.Since(startTime) > maxTimePerRound*2 {
+	for len(waitChannel) == 0 {
+		if time.Since(startTimeRound) > maxTimePerRound*2 {
 			//tells follower that this worker is done
 			writeToWError(followerConnection, gotClient, nil, 0)
 			wg.Done()
@@ -780,7 +784,7 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		time.Sleep(1 * time.Second)
 	}
 
-	for clientConnection := range waiCthannel {
+	for clientConnection := range waitChannel {
 		clientsServedPhase3[round] = clientsServedPhase3[round] + 1
 
 		if clientsServedPhase3[round]%1000 == 0 {
@@ -795,7 +799,7 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		//tells client current phase
 		errorBool := writeToWError(clientConnection, phase, followerConnection, 2)
 		if errorBool {
-			contBool := handleClientDC(wg, followerConnection, waiCthannel)
+			contBool := handleClientDC(wg, followerConnection, waitChannel)
 			if contBool {
 				continue
 			} else {
@@ -832,7 +836,7 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		//tells client what leader expects
 		errorBool = writeToWError(clientConnection, subPhase, followerConnection, 2)
 		if errorBool {
-			contBool := handleClientDC(wg, followerConnection, waiCthannel)
+			contBool := handleClientDC(wg, followerConnection, waitChannel)
 			if contBool {
 				continue
 			} else {
@@ -857,7 +861,7 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		if subPhase[0] == 0 {
 			errorBool := sendTopicLists(clientConnection, followerConnection, false)
 			if errorBool {
-				contBool := handleClientDC(wg, followerConnection, waiCthannel)
+				contBool := handleClientDC(wg, followerConnection, waitChannel)
 				if contBool {
 					continue
 				} else {
@@ -866,7 +870,7 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 			}
 			clientKeys, _, errorBool = handlePirQuery(clientKeys, clientConnection, followerConnection, int(subPhase[0]), false)
 			if errorBool {
-				contBool := handleClientDC(wg, followerConnection, waiCthannel)
+				contBool := handleClientDC(wg, followerConnection, waitChannel)
 				if contBool {
 					continue
 				} else {
@@ -876,7 +880,7 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		} else if subPhase[0] == 1 {
 			errorBool := sendTopicLists(clientConnection, followerConnection, false)
 			if errorBool {
-				contBool := handleClientDC(wg, followerConnection, waiCthannel)
+				contBool := handleClientDC(wg, followerConnection, waitChannel)
 				if contBool {
 					continue
 				} else {
@@ -889,7 +893,7 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 
 			clientKeys, _, errorBool = handlePirQuery(clientKeys, clientConnection, followerConnection, int(subPhase[0]), false)
 			if errorBool {
-				contBool := handleClientDC(wg, followerConnection, waiCthannel)
+				contBool := handleClientDC(wg, followerConnection, waitChannel)
 				if contBool {
 					continue
 				} else {
@@ -900,7 +904,7 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 
 		errorBool = getSendTweets(clientKeys, nil, clientConnection, followerConnection, m)
 		if errorBool {
-			contBool := handleClientDC(wg, followerConnection, waiCthannel)
+			contBool := handleClientDC(wg, followerConnection, waitChannel)
 			if contBool {
 				continue
 			} else {
@@ -910,7 +914,7 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 
 		wantsArchive, errorBool := readFrom(clientConnection, 1, followerConnection, 2)
 		if errorBool {
-			contBool := handleClientDC(wg, followerConnection, waiCthannel)
+			contBool := handleClientDC(wg, followerConnection, waitChannel)
 			if contBool {
 				continue
 			} else {
@@ -923,7 +927,7 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		if wantsArchive[0] == 1 && archiveTopicAmount > 0 {
 			_, archiveQuerys, errorBool := handlePirQuery(clientKeys, clientConnection, followerConnection, -1, false)
 			if errorBool {
-				contBool := handleClientDC(wg, followerConnection, waiCthannel)
+				contBool := handleClientDC(wg, followerConnection, waitChannel)
 				if contBool {
 					continue
 				} else {
@@ -932,7 +936,7 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 			}
 			errorBool = getSendTweets(clientKeys, archiveQuerys, clientConnection, followerConnection, m)
 			if errorBool {
-				contBool := handleClientDC(wg, followerConnection, waiCthannel)
+				contBool := handleClientDC(wg, followerConnection, waitChannel)
 				if contBool {
 					continue
 				} else {
@@ -946,12 +950,12 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 		clientData[clientConnection.RemoteAddr()] = clientKeys
 		m.Unlock()
 
-		waiCthannel <- clientConnection
+		waitChannel <- clientConnection
 
 		for {
-			if time.Since(startTime) < 2*maxTimePerRound {
+			if time.Since(startTimeRound) < 2*maxTimePerRound {
 				//this worker handles the next client
-				if len(waiCthannel) > 0 {
+				if len(waitChannel) > 0 {
 					break
 					//this worker waits for next client
 				} else {
@@ -973,7 +977,7 @@ func phase3(id int, phase []byte, followerConnection net.Conn, wg *sync.WaitGrou
 func handleClientDC(wg *sync.WaitGroup, followerConnection net.Conn, channel chan net.Conn) bool {
 	//loop that waits for new client or leaves phase1 if time is up
 	for {
-		if time.Since(startTime) < maxTimePerRound {
+		if time.Since(startTimeRound) < maxTimePerRound {
 			//this worker handles the next client
 			if len(channel) > 0 {
 				return true

+ 5 - 0
lib/databaseRead.go

@@ -3,6 +3,7 @@ package lib
 import (
 	"bytes"
 	"encoding/json"
+	"fmt"
 )
 
 //topicPointer and textPointer should not be exported
@@ -148,7 +149,11 @@ func tweetsToByteArray(tweetsToReturn [][]Tweet) []byte {
 		//adds padding
 		tweetToAppend = append(tweetToAppend, []byte(";;")[:]...)
 		length := minimumBlockSize - len(tweetToAppend)
+		//fmt.Println("len", len(tweetToAppend))
 		//todo! replace with grouping
+		if length < 0 {
+			fmt.Println("<0", string(tweetToAppend))
+		}
 		padding := bytes.Repeat([]byte(";"), length)
 		tweetToAppend = append(tweetToAppend, padding...)
 		Xor(tweetToAppend, tweetsAsBytes)