client.go 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588
  1. package main
  2. /*
  3. #cgo CFLAGS: -O2
  4. #cgo LDFLAGS: -lcrypto -lm
  5. #include "../c/dpf.h"
  6. #include "../c/okvClient.h"
  7. #include "../c/dpf.c"
  8. #include "../c/okvClient.c"
  9. */
  10. import "C"
  11. //sssssssssss
  12. import (
  13. lib "2PPS/lib"
  14. "bytes"
  15. "crypto/rand"
  16. "crypto/sha256"
  17. "crypto/tls"
  18. "encoding/json"
  19. "fmt"
  20. "math/big"
  21. mr "math/rand"
  22. "net"
  23. "sort"
  24. "sync"
  25. "time"
  26. "unsafe"
  27. "golang.org/x/crypto/nacl/box"
  28. )
  29. type tweet struct {
  30. Topics []string
  31. Text string
  32. }
  33. const leader string = "127.0.0.1:4441"
  34. //needs to be changed at leader/follower/client at the same time
  35. const neededSubscriptions = 1
  36. const numClients = 1
  37. var topicList []string
  38. var archiveTopicList []string
  39. //todo! expand this for multiple clients
  40. var archiveInterests = make([]int, 1)
  41. var sharedSecret [numClients][2][32]byte = createSharedSecret()
  42. var wantsArchive = make([]byte, 1)
  43. var dataLen int = 32
  44. var numThreads int = 12
  45. var dbWriteSize int = 2
  46. var leaderPublicKey *[32]byte
  47. var followerPublicKey *[32]byte
  48. var clientPrivateKey *[32]byte
  49. var clientPublicKey *[32]byte
  50. func main() {
  51. //creates test tweets
  52. tweets := make([][]byte, numClients)
  53. for i := range tweets {
  54. tweets[i] = make([]byte, dataLen)
  55. }
  56. for i := 0; i < numClients; i++ {
  57. var tweet []byte
  58. if i == 0 {
  59. topics := []byte("house; mouse")
  60. text := []byte("I am a house in a mouse;")
  61. tweet = append(tweet, topics...)
  62. tweet = append(tweet, text...)
  63. }
  64. length := len(tweet)
  65. for i := length; i < dataLen; i++ {
  66. tweet = append(tweet, []byte(";")[0])
  67. }
  68. tweets[i] = tweet
  69. }
  70. wg := &sync.WaitGroup{}
  71. for i := 0; i < numClients; i++ {
  72. wg.Add(1)
  73. go client(tweets[i], i)
  74. }
  75. wg.Wait()
  76. }
  77. func client(tweet []byte, clientNumber int) {
  78. /*
  79. if len(os.Args) != 4 {
  80. fmt.Println("try again with: numThreads, dataLength, numRows")
  81. return
  82. }
  83. //input when executing is follower amount
  84. serverAmount, _ = strconv.Atoi(os.Args[1])
  85. serverAmount++
  86. dataLen, _ = strconv.Atoi(os.Args[2])
  87. numThreads, _ = strconv.Atoi(os.Args[3])
  88. dbSize, _ = strconv.Atoi(os.Args[4])
  89. */
  90. generatedPublicKey, generatedPrivateKey, err := box.GenerateKey(rand.Reader)
  91. if err != nil {
  92. panic(err)
  93. }
  94. clientPrivateKey = generatedPrivateKey
  95. clientPublicKey = generatedPublicKey
  96. C.initializeCipher()
  97. //initializes the connection to the leader
  98. conf := &tls.Config{
  99. InsecureSkipVerify: true,
  100. }
  101. leaderConn, err := tls.Dial("tcp", leader, conf)
  102. if err != nil {
  103. panic(err)
  104. }
  105. leaderConn.SetDeadline(time.Time{})
  106. //receives topics first so client can participate asap
  107. receiveTopicLists(leaderConn)
  108. //gets the public keys of both servers
  109. var tmpLeaderPubKey [32]byte
  110. _, err = leaderConn.Read(tmpLeaderPubKey[:])
  111. if err != nil {
  112. panic(err)
  113. }
  114. leaderPublicKey = &tmpLeaderPubKey
  115. var tmpFollowerPubKey [32]byte
  116. _, err = leaderConn.Read(tmpFollowerPubKey[:])
  117. if err != nil {
  118. panic(err)
  119. }
  120. followerPublicKey = &tmpFollowerPubKey
  121. //sends own public key
  122. _, err = leaderConn.Write(clientPublicKey[:])
  123. if err != nil {
  124. panic(err)
  125. }
  126. //setup ends above
  127. //while client is active he is always connected and has to participate
  128. for {
  129. //gets current phase
  130. phase := make([]byte, 1)
  131. _, err = leaderConn.Read(phase)
  132. if err != nil {
  133. panic(err)
  134. }
  135. fmt.Println("Phase: ", phase[0])
  136. if phase[0] == 1 {
  137. //gets current dbWriteSize from leader
  138. dbWriteSizeBytes := make([]byte, 4)
  139. _, err = leaderConn.Read(dbWriteSizeBytes)
  140. if err != nil {
  141. panic(err)
  142. }
  143. dbWriteSize = byteToInt(dbWriteSizeBytes)
  144. //todo! put into tweet creation
  145. roundAsBytes := make([]byte, 4)
  146. _, err = leaderConn.Read(roundAsBytes)
  147. if err != nil {
  148. panic(err)
  149. }
  150. //prep the query
  151. dataSize := len(tweet)
  152. querySize := make([]byte, 4)
  153. cQuerySize := C.int(byteToInt(querySize))
  154. var dpfQueryA *C.uchar
  155. var dpfQueryB *C.uchar
  156. //change
  157. C.prepQuery(C.int(1), C.int(dbWriteSize), (*C.uchar)(&tweet[0]), C.int(dataSize), &cQuerySize, &dpfQueryA, &dpfQueryB)
  158. intQuerySize := int(cQuerySize) //byteToInt(querySize)
  159. //write the query
  160. queryAPlaintext := C.GoBytes(unsafe.Pointer(dpfQueryA), C.int(intQuerySize))
  161. //encrypts queryA and appends it to message
  162. var nonce [24]byte
  163. //fill nonce with randomness
  164. _, err = rand.Read(nonce[:])
  165. if err != nil {
  166. panic("couldn't get randomness for nonce!")
  167. }
  168. dpfQueryAEncrypted := box.Seal(nonce[:], queryAPlaintext, &nonce, leaderPublicKey, clientPrivateKey)
  169. //encrypts queryB and appends it to message
  170. queryBPlaintext := C.GoBytes(unsafe.Pointer(dpfQueryB), C.int(intQuerySize))
  171. //fill nonce with randomness
  172. _, err = rand.Read(nonce[:])
  173. if err != nil {
  174. panic("couldn't get randomness for nonce!")
  175. }
  176. dpfQueryBEncrypted := box.Seal(nonce[:], queryBPlaintext, &nonce, followerPublicKey, clientPrivateKey)
  177. //writes the dpfQuery to the leader
  178. dpfLengthBytes := intToByte(len(dpfQueryAEncrypted))
  179. _, err = leaderConn.Write(dpfLengthBytes)
  180. if err != nil {
  181. panic(err)
  182. }
  183. _, err = leaderConn.Write(dpfQueryAEncrypted)
  184. if err != nil {
  185. panic(err)
  186. }
  187. _, err = leaderConn.Write(dpfQueryBEncrypted)
  188. if err != nil {
  189. panic(err)
  190. }
  191. C.free(unsafe.Pointer(dpfQueryA))
  192. C.free(unsafe.Pointer(dpfQueryB))
  193. } else if phase[0] == 3 {
  194. /*
  195. possible Values
  196. 0 : new client
  197. leader expects sharedSecrets, expects pirQuery
  198. 1 : update needed
  199. leader sends topicList, performs local update of sharedSecret, expects pirQuery
  200. 2 : no update needed
  201. nothing
  202. */
  203. subPhase := make([]byte, 1)
  204. _, err := leaderConn.Read(subPhase)
  205. if err != nil {
  206. panic(err)
  207. }
  208. var encryptedQueryLeader, encryptedQueryFollower []byte
  209. //first time participating
  210. if subPhase[0] == 0 {
  211. receiveTopicLists(leaderConn)
  212. encryptedQueryLeader, encryptedQueryFollower = createPIRQuery(int(subPhase[0]), clientNumber)
  213. sendQuerys(encryptedQueryLeader, encryptedQueryFollower, leaderConn, false)
  214. }
  215. //updates the topic list and what client is interested in
  216. if subPhase[0] == 1 {
  217. receiveTopicLists(leaderConn)
  218. //updates local secret
  219. for index := 0; index < 2; index++ {
  220. sharedSecret[clientNumber][index] = sha256.Sum256(sharedSecret[clientNumber][index][:])
  221. }
  222. encryptedQueryLeader, encryptedQueryFollower = createPIRQuery(int(subPhase[0]), clientNumber)
  223. sendQuerys(encryptedQueryLeader, encryptedQueryFollower, leaderConn, false)
  224. }
  225. receiveTweets(sharedSecret[clientNumber], leaderConn, clientNumber, false)
  226. if len(archiveTopicList) > 0 {
  227. wantsArchive[0] = 0 //archive test
  228. } else {
  229. wantsArchive[0] = 0
  230. }
  231. _, err = leaderConn.Write(wantsArchive)
  232. if err != nil {
  233. panic(err)
  234. }
  235. if wantsArchive[0] == 1 && len(archiveTopicList) > 0 {
  236. encryptedQueryLeader, encryptedQueryFollower = createPIRQuery(-1, clientNumber)
  237. sendQuerys(encryptedQueryLeader, encryptedQueryFollower, leaderConn, true)
  238. receiveTweets(sharedSecret[clientNumber], leaderConn, clientNumber, true)
  239. }
  240. } else {
  241. panic("somethin went wrong")
  242. }
  243. }
  244. }
  245. //creates and sends the pirQuerys for each server
  246. func createPIRQuery(subPhase int, clientNumber int) ([]byte, []byte) {
  247. //later this will be taken from gui, this is only for testing
  248. topicsOfInterest := make([]int, 1)
  249. topicsOfInterest[0] = 1
  250. archiveInterests[0] = 1
  251. tmptopicsOfInterest := make([]int, len(topicsOfInterest))
  252. copy(tmptopicsOfInterest, topicsOfInterest)
  253. tmpNeededSubscriptions := neededSubscriptions
  254. tmpTopicList := make([]string, len(topicList))
  255. copy(tmpTopicList, topicList)
  256. if wantsArchive[0] == 1 && subPhase == -1 {
  257. tmpNeededSubscriptions = len(archiveInterests)
  258. copy(tmptopicsOfInterest, archiveInterests) //archiveInterests from gui
  259. copy(tmpTopicList, archiveTopicList)
  260. }
  261. topicsOfInterestAsBytes := make([][]byte, tmpNeededSubscriptions)
  262. for i := range topicsOfInterestAsBytes {
  263. topicsOfInterestAsBytes[i] = make([]byte, len(tmpTopicList))
  264. }
  265. //creates fake topicsOfInterest if client is boooring
  266. if len(tmptopicsOfInterest) < tmpNeededSubscriptions && subPhase != -1 {
  267. tmptopicsOfInterest = addFakeInterests(tmptopicsOfInterest)
  268. }
  269. for topic, position := range tmptopicsOfInterest {
  270. topicsOfInterestAsBytes[topic][position-1] = 1
  271. }
  272. //this for N servers for one topic
  273. /* interested in topic 3, 6 servers
  274. this needs to be repeated for every topic client is interested in
  275. wanted
  276. [0, 0, 1]
  277. random creation of 1 pirQuery
  278. [0, 1, 0]
  279. manual creation of 1 pirQuery
  280. [0, 1, 1]
  281. xor result
  282. [0, 0, 1]
  283. */
  284. //pirQuery [serverAmount][topicsofinterest][topicAmount]byte
  285. pirQuerys := make([][][]byte, 2)
  286. for i := range pirQuerys {
  287. pirQuerys[i] = make([][]byte, len(tmptopicsOfInterest))
  288. for j := range pirQuerys[i] {
  289. pirQuerys[i][j] = make([]byte, len(tmpTopicList))
  290. }
  291. }
  292. //for leader
  293. //pirQuery will be filled with random bits
  294. for topic := range tmptopicsOfInterest {
  295. for index := range tmpTopicList {
  296. bit, err := rand.Int(rand.Reader, big.NewInt(2))
  297. if err != nil {
  298. panic(err)
  299. }
  300. pirQuerys[0][topic][index] = byte(bit.Int64())
  301. }
  302. }
  303. //creating last manually with result and wanted
  304. //if position random result correct -> 0, not correct -> 1
  305. for topic := range tmptopicsOfInterest {
  306. for index := range tmpTopicList {
  307. if pirQuerys[0][topic][index] == topicsOfInterestAsBytes[topic][index] {
  308. pirQuerys[1][topic][index] = 0
  309. } else {
  310. pirQuerys[1][topic][index] = 1
  311. }
  312. }
  313. }
  314. //flattens the querys to be able to send them more efficently
  315. messagesFlattened := make([][]byte, 2)
  316. //adds the sharedSecret to the first pirQuery when first time participating
  317. if subPhase == 0 {
  318. for server := 0; server < 2; server++ {
  319. messagesFlattened[server] = append(messagesFlattened[server], sharedSecret[clientNumber][server][:]...)
  320. }
  321. }
  322. for server := 0; server < 2; server++ {
  323. for topic := range pirQuerys[server] {
  324. messagesFlattened[server] = append(messagesFlattened[server], pirQuerys[server][topic][:]...)
  325. }
  326. }
  327. var nonce [24]byte
  328. _, err := rand.Read(nonce[:])
  329. if err != nil {
  330. panic("couldn't get randomness for nonce!")
  331. }
  332. encryptedQueryLeader := box.Seal(nonce[:], messagesFlattened[0], &nonce, leaderPublicKey, clientPrivateKey)
  333. _, err = rand.Read(nonce[:])
  334. if err != nil {
  335. panic("couldn't get randomness for nonce!")
  336. }
  337. encryptedQueryFollower := box.Seal(nonce[:], messagesFlattened[1], &nonce, followerPublicKey, clientPrivateKey)
  338. return encryptedQueryLeader, encryptedQueryFollower
  339. }
  340. func sendQuerys(encryptedQueryLeader, encryptedQueryFollower []byte, leaderConn net.Conn, getArchive bool) {
  341. encryptedLength := len(encryptedQueryLeader)
  342. //sends the pirQuerysLength to the leader
  343. _, err := leaderConn.Write(intToByte(encryptedLength))
  344. if err != nil {
  345. panic(err)
  346. }
  347. //sends the pirQuerys to the leader
  348. _, err = leaderConn.Write(encryptedQueryLeader)
  349. if err != nil {
  350. panic(err)
  351. }
  352. _, err = leaderConn.Write(encryptedQueryFollower)
  353. if err != nil {
  354. panic(err)
  355. }
  356. if getArchive {
  357. leaderConn.Write(intToByte(len(archiveInterests)))
  358. if err != nil {
  359. panic(err)
  360. }
  361. }
  362. }
  363. func receiveTweets(sharedSecret [2][32]byte, leaderConn net.Conn, clientNumber int, getArchive bool) {
  364. tmpNeededSubscriptions := neededSubscriptions
  365. if getArchive {
  366. tmpNeededSubscriptions = len(archiveInterests)
  367. }
  368. for i := 0; i < tmpNeededSubscriptions; i++ {
  369. //client receives tweets
  370. tweetsLengthBytes := make([]byte, 4)
  371. _, err := leaderConn.Read(tweetsLengthBytes)
  372. if err != nil {
  373. panic(err)
  374. }
  375. tweetsLength := byteToInt(tweetsLengthBytes)
  376. tweets := make([]byte, tweetsLength)
  377. _, err = leaderConn.Read(tweets)
  378. if err != nil {
  379. panic(err)
  380. }
  381. //expand sharedSecret so it is of right length
  382. expandBy := len(tweets) / 32
  383. expandedSharedSecrets := make([][]byte, 2)
  384. for i := 0; i < 2; i++ {
  385. for j := 0; j < expandBy; j++ {
  386. expandedSharedSecrets[i] = append(expandedSharedSecrets[i], sharedSecret[i][:]...)
  387. }
  388. //fmt.Println(expandedSharedSecrets[i])
  389. }
  390. //xors the received messge into the message to display
  391. for i := 0; i < 2; i++ {
  392. lib.Xor(expandedSharedSecrets[i][:], tweets)
  393. }
  394. //tweets can be displayed
  395. fmt.Println("final result: ", string(tweets))
  396. }
  397. }
  398. //creates a shared secret for each server
  399. func createSharedSecret() [numClients][2][32]byte {
  400. var tmpSharedSecret [numClients][2][32]byte
  401. for i := 0; i < numClients; i++ {
  402. for j := 0; j < 2; j++ {
  403. _, err := rand.Read(tmpSharedSecret[i][j][:])
  404. if err != nil {
  405. panic("couldn't get randomness for sharedSecret!")
  406. }
  407. }
  408. }
  409. return tmpSharedSecret
  410. }
  411. //generates a topicOfInterest array with random values
  412. //todo! fix
  413. func addFakeInterests(topicsOfInterest []int) []int {
  414. fakeTopicsOfInterest := make([]int, neededSubscriptions)
  415. maxInt := neededSubscriptions
  416. //fills the array with unique random ascending values in range with len(topicList)
  417. for i := 0; i < neededSubscriptions; i++ {
  418. fakeTopicsOfInterest[i] = mr.Intn(maxInt) + 1
  419. for j := 0; j < i; j++ {
  420. if fakeTopicsOfInterest[i] == fakeTopicsOfInterest[j] {
  421. i--
  422. break
  423. }
  424. }
  425. }
  426. sort.Ints(fakeTopicsOfInterest)
  427. fmt.Println(fakeTopicsOfInterest)
  428. //adds unique and new random numbers to topicOfInterests until length is satisfied
  429. for _, number := range fakeTopicsOfInterest {
  430. if !inList(number, topicsOfInterest) {
  431. topicsOfInterest = append(topicsOfInterest, number)
  432. }
  433. if len(topicsOfInterest) == neededSubscriptions {
  434. break
  435. }
  436. }
  437. fmt.Println(topicsOfInterest)
  438. return topicsOfInterest
  439. }
  440. func inList(number int, list []int) bool {
  441. for _, element := range list {
  442. if element == number {
  443. return true
  444. }
  445. }
  446. return false
  447. }
  448. func receiveTopicLists(leaderConn net.Conn) {
  449. for i := 0; i < 2; i++ {
  450. topicListLength := make([]byte, 4)
  451. _, err := leaderConn.Read(topicListLength)
  452. if err != nil {
  453. panic(err)
  454. }
  455. recTopicList := make([]byte, byteToInt(topicListLength))
  456. _, err = leaderConn.Read(recTopicList[:])
  457. if err != nil {
  458. panic(err)
  459. }
  460. var tmpTopicList []string
  461. arrayReader := bytes.NewReader(recTopicList[:])
  462. json.NewDecoder(arrayReader).Decode(&tmpTopicList)
  463. if i == 0 {
  464. topicList = tmpTopicList
  465. } else {
  466. archiveTopicList = tmpTopicList
  467. }
  468. }
  469. }
  470. func intToByte(myInt int) (retBytes []byte) {
  471. retBytes = make([]byte, 4)
  472. retBytes[3] = byte((myInt >> 24) & 0xff)
  473. retBytes[2] = byte((myInt >> 16) & 0xff)
  474. retBytes[1] = byte((myInt >> 8) & 0xff)
  475. retBytes[0] = byte(myInt & 0xff)
  476. return
  477. }
  478. func byteToInt(myBytes []byte) (x int) {
  479. x = int(myBytes[3])<<24 + int(myBytes[2])<<16 + int(myBytes[1])<<8 + int(myBytes[0])
  480. return
  481. }