main.go 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. // Skeleton to part 9 of the Whispering Gophers code lab.
  2. //
  3. // This program extends part 8.
  4. //
  5. // It connects to the peer specified by -peer.
  6. // It accepts connections from peers and receives messages from them.
  7. // When it sees a peer with an address it hasn't seen before, it makes a
  8. // connection to that peer.
  9. // It adds an ID field containing a random string to each outgoing message.
  10. // When it receives a message with an ID it hasn't seen before, it broadcasts
  11. // that message to all connected peers.
  12. //
  13. package main
  14. import (
  15. "bufio"
  16. "encoding/json"
  17. "flag"
  18. "fmt"
  19. "log"
  20. "net"
  21. "os"
  22. "sync"
  23. "code.osinet.fr/fgm/whispering_gophers/util"
  24. )
  25. var (
  26. listenAddr = flag.String("listen", "", "peer host:port")
  27. peerAddr = flag.String("peer", "", "peer host:port")
  28. self string
  29. )
  30. type Message struct {
  31. // Add ID field
  32. ID string
  33. Addr string
  34. Body string
  35. }
  36. func main() {
  37. flag.Parse()
  38. var l net.Listener
  39. var err error
  40. // Create a new listener using util.Listen and put it in a variable named l.
  41. if *listenAddr == "" {
  42. l, err = util.ListenOnFirstUsableInterface()
  43. } else {
  44. l, err = net.Listen("tcp4", *listenAddr)
  45. }
  46. if err != nil {
  47. log.Fatal(err)
  48. }
  49. self = l.Addr().String()
  50. log.Println("Listening on", self)
  51. go dial(*peerAddr)
  52. go readInput()
  53. for {
  54. c, err := l.Accept()
  55. if err != nil {
  56. log.Fatal(err)
  57. }
  58. go serve(c)
  59. }
  60. }
  61. var peers = &Peers{m: make(map[string]chan<- Message)}
  62. type Peers struct {
  63. m map[string]chan<- Message
  64. mu sync.RWMutex
  65. }
  66. // Add creates and returns a new channel for the given peer address.
  67. // If an address already exists in the registry, it returns nil.
  68. func (p *Peers) Add(addr string) <-chan Message {
  69. p.mu.Lock()
  70. defer p.mu.Unlock()
  71. if _, ok := p.m[addr]; ok {
  72. return nil
  73. }
  74. ch := make(chan Message)
  75. p.m[addr] = ch
  76. return ch
  77. }
  78. // Remove deletes the specified peer from the registry.
  79. func (p *Peers) Remove(addr string) {
  80. p.mu.Lock()
  81. defer p.mu.Unlock()
  82. delete(p.m, addr)
  83. }
  84. // List returns a slice of all active peer channels.
  85. func (p *Peers) List() []chan<- Message {
  86. p.mu.RLock()
  87. defer p.mu.RUnlock()
  88. l := make([]chan<- Message, 0, len(p.m))
  89. for _, ch := range p.m {
  90. l = append(l, ch)
  91. }
  92. return l
  93. }
  94. func broadcast(m Message) {
  95. for _, ch := range peers.List() {
  96. select {
  97. case ch <- m:
  98. default:
  99. // Okay to drop messages sometimes.
  100. }
  101. }
  102. }
  103. func serve(c net.Conn) {
  104. defer c.Close()
  105. d := json.NewDecoder(c)
  106. for {
  107. var m Message
  108. err := d.Decode(&m)
  109. if err != nil {
  110. log.Println(err)
  111. return
  112. }
  113. // If this message has seen before, ignore it.
  114. if Seen(m.ID) {
  115. continue
  116. }
  117. fmt.Printf("%#v\n", m)
  118. broadcast(m)
  119. go dial(m.Addr)
  120. }
  121. }
  122. func readInput() {
  123. s := bufio.NewScanner(os.Stdin)
  124. for s.Scan() {
  125. m := Message{
  126. // Use util.RandomID to populate the ID field.
  127. Addr: self,
  128. Body: s.Text(),
  129. ID: util.RandomID(),
  130. }
  131. Seen(m.ID)
  132. broadcast(m)
  133. }
  134. if err := s.Err(); err != nil {
  135. log.Fatal(err)
  136. }
  137. os.Exit(0)
  138. }
  139. func dial(addr string) {
  140. if addr == self {
  141. return // Don't try to dial self.
  142. }
  143. ch := peers.Add(addr)
  144. if ch == nil {
  145. return // Peer already connected.
  146. }
  147. defer peers.Remove(addr)
  148. c, err := net.Dial("tcp", addr)
  149. if err != nil {
  150. log.Println(addr, err)
  151. return
  152. }
  153. defer c.Close()
  154. e := json.NewEncoder(c)
  155. for m := range ch {
  156. err := e.Encode(m)
  157. if err != nil {
  158. log.Println(addr, err)
  159. return
  160. }
  161. }
  162. }
  163. // Create a new map of seen message IDs and a mutex to protect it.
  164. var seenIDs = struct {
  165. m map[string]bool
  166. sync.Mutex
  167. }{m: make(map[string]bool)}
  168. // Seen returns true if the specified id has been seen before.
  169. // If not, it returns false and marks the given id as "seen".
  170. func Seen(id string) bool {
  171. // Get a write lock on the seen message IDs map and unlock it at before returning.
  172. seenIDs.Lock()
  173. defer seenIDs.Unlock()
  174. // Check if the id has been seen before and return that later.
  175. ok := seenIDs.m[id]
  176. // Mark the ID as seen in the map.
  177. seenIDs.m[id] = true
  178. return ok
  179. }