main.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. // Skeleton to part 8 of the Whispering Gophers code lab.
  2. //
  3. // This program extends part 7.
  4. //
  5. // It connects to the peer specified by -peer.
  6. // It accepts connections from peers and receives messages from them.
  7. // When it sees a peer with an address it hasn't seen before, it opens a
  8. // connection to that peer.
  9. //
  10. package main
  11. import (
  12. "bufio"
  13. "encoding/json"
  14. "flag"
  15. "fmt"
  16. "log"
  17. "net"
  18. "os"
  19. "sync"
  20. "code.osinet.fr/fgm/whispering_gophers/util"
  21. )
  22. var (
  23. listenAddr = flag.String("listen", "", "peer host:port")
  24. peerAddr = flag.String("peer", "", "peer host:port")
  25. self string
  26. )
  27. type Message struct {
  28. Addr string
  29. Body string
  30. }
  31. func main() {
  32. flag.Parse()
  33. var l net.Listener
  34. var err error
  35. // Create a new listener using util.Listen and put it in a variable named l.
  36. if *listenAddr == "" {
  37. l, err = util.ListenOnFirstUsableInterface()
  38. } else {
  39. l, err = net.Listen("tcp4", *listenAddr)
  40. }
  41. if err != nil {
  42. log.Fatal(err)
  43. }
  44. self = l.Addr().String()
  45. log.Println("Listening on", self)
  46. go dial(*peerAddr)
  47. go readInput()
  48. for {
  49. c, err := l.Accept()
  50. if err != nil {
  51. log.Fatal(err)
  52. }
  53. go serve(c)
  54. }
  55. }
  56. // Create a global shared Peers instance
  57. type Peers struct {
  58. m map[string]chan<- Message
  59. mu sync.RWMutex
  60. }
  61. var peers = Peers{
  62. m: make(map[string]chan<- Message),
  63. }
  64. // Add creates and returns a new channel for the given peer address.
  65. // If an address already exists in the registry, it returns nil.
  66. func (p *Peers) Add(addr string) <-chan Message {
  67. p.mu.Lock()
  68. defer p.mu.Unlock()
  69. if _, ok := p.m[addr]; ok {
  70. return nil
  71. }
  72. ch := make(chan Message)
  73. p.m[addr] = ch
  74. return ch
  75. }
  76. // Remove deletes the specified peer from the registry.
  77. func (p *Peers) Remove(addr string) {
  78. p.mu.Lock()
  79. defer p.mu.Unlock()
  80. delete(p.m, addr)
  81. }
  82. // List returns a slice of all active peer channels.
  83. func (p *Peers) List() []chan<- Message {
  84. p.mu.RLock()
  85. defer p.mu.RUnlock()
  86. l := make([]chan<- Message, 0, len(p.m))
  87. for _, ch := range p.m {
  88. l = append(l, ch)
  89. }
  90. return l
  91. }
  92. func broadcast(m Message) {
  93. /* Range over the list of peers */
  94. for i, peer := range peers.List() {
  95. // Send a message to the channel, but don't block.
  96. // Hint: Select is your friend.
  97. select {
  98. case peer <- m:
  99. default:
  100. log.Printf("Sending to peer %d would have blocked.\n", i)
  101. }
  102. }
  103. }
  104. func serve(c net.Conn) {
  105. defer c.Close()
  106. d := json.NewDecoder(c)
  107. for {
  108. var m Message
  109. err := d.Decode(&m)
  110. if err != nil {
  111. log.Println(err)
  112. return
  113. }
  114. // Launch dial in a new goroutine, to connect to the address in the message's Addr field.
  115. go dial(m.Addr)
  116. fmt.Printf("%#v\n", m)
  117. }
  118. }
  119. func readInput() {
  120. s := bufio.NewScanner(os.Stdin)
  121. for s.Scan() {
  122. m := Message{
  123. Addr: self,
  124. Body: s.Text(),
  125. }
  126. broadcast(m)
  127. }
  128. if err := s.Err(); err != nil {
  129. log.Fatal(err)
  130. }
  131. os.Exit(0)
  132. }
  133. func dial(addr string) {
  134. // If dialing self, return.
  135. if addr == self {
  136. return
  137. }
  138. // Add the address to the peers map.
  139. ch := peers.Add(addr)
  140. // If you get a nil channel the peer is already connected, return.
  141. if ch == nil {
  142. return
  143. }
  144. // Remove the address from peers map when this function returns
  145. defer peers.Remove(addr)
  146. c, err := net.Dial("tcp", addr)
  147. if err != nil {
  148. log.Println(addr, err)
  149. return
  150. }
  151. defer c.Close()
  152. e := json.NewEncoder(c)
  153. for m := range ch {
  154. err := e.Encode(m)
  155. if err != nil {
  156. log.Println(addr, err)
  157. return
  158. }
  159. }
  160. }