// Skeleton to part 8 of the Whispering Gophers code lab. // // This program extends part 7. // // It connects to the peer specified by -peer. // It accepts connections from peers and receives messages from them. // When it sees a peer with an address it hasn't seen before, it opens a // connection to that peer. // package main import ( "bufio" "encoding/json" "flag" "fmt" "log" "net" "os" "sync" "code.osinet.fr/fgm/whispering_gophers/util" ) var ( listenAddr = flag.String("listen", "", "peer host:port") peerAddr = flag.String("peer", "", "peer host:port") self string ) type Message struct { Addr string Body string } func main() { flag.Parse() var l net.Listener var err error // Create a new listener using util.Listen and put it in a variable named l. if *listenAddr == "" { l, err = util.ListenOnFirstUsableInterface() } else { l, err = net.Listen("tcp4", *listenAddr) } if err != nil { log.Fatal(err) } self = l.Addr().String() log.Println("Listening on", self) go dial(*peerAddr) go readInput() for { c, err := l.Accept() if err != nil { log.Fatal(err) } go serve(c) } } // Create a global shared Peers instance type Peers struct { m map[string]chan<- Message mu sync.RWMutex } var peers = Peers{ m: make(map[string]chan<- Message), } // Add creates and returns a new channel for the given peer address. // If an address already exists in the registry, it returns nil. func (p *Peers) Add(addr string) <-chan Message { p.mu.Lock() defer p.mu.Unlock() if _, ok := p.m[addr]; ok { return nil } ch := make(chan Message) p.m[addr] = ch return ch } // Remove deletes the specified peer from the registry. func (p *Peers) Remove(addr string) { p.mu.Lock() defer p.mu.Unlock() delete(p.m, addr) } // List returns a slice of all active peer channels. func (p *Peers) List() []chan<- Message { p.mu.RLock() defer p.mu.RUnlock() l := make([]chan<- Message, 0, len(p.m)) for _, ch := range p.m { l = append(l, ch) } return l } func broadcast(m Message) { /* Range over the list of peers */ for i, peer := range peers.List() { // Send a message to the channel, but don't block. // Hint: Select is your friend. select { case peer <- m: default: log.Printf("Sending to peer %d would have blocked.\n", i) } } } func serve(c net.Conn) { defer c.Close() d := json.NewDecoder(c) for { var m Message err := d.Decode(&m) if err != nil { log.Println(err) return } // Launch dial in a new goroutine, to connect to the address in the message's Addr field. go dial(m.Addr) fmt.Printf("%#v\n", m) } } func readInput() { s := bufio.NewScanner(os.Stdin) for s.Scan() { m := Message{ Addr: self, Body: s.Text(), } broadcast(m) } if err := s.Err(); err != nil { log.Fatal(err) } os.Exit(0) } func dial(addr string) { // If dialing self, return. if addr == self { return } // Add the address to the peers map. ch := peers.Add(addr) // If you get a nil channel the peer is already connected, return. if ch == nil { return } // Remove the address from peers map when this function returns defer peers.Remove(addr) c, err := net.Dial("tcp", addr) if err != nil { log.Println(addr, err) return } defer c.Close() e := json.NewEncoder(c) for m := range ch { err := e.Encode(m) if err != nil { log.Println(addr, err) return } } }