// Skeleton to part 8 of the Whispering Gophers code lab. // // This program extends part 7. // // It connects to the peer specified by -peer. // It accepts connections from peers and receives messages from them. // When it sees a peer with an address it hasn't seen before, it opens a // connection to that peer. // package main import ( "bufio" "encoding/json" "flag" "fmt" "log" "net" "os" "sync" "github.com/campoy/whispering-gophers/util" ) var ( peerAddr = flag.String("peer", "", "peer host:port") self string ) type Message struct { Addr string Body string } func main() { flag.Parse() l, err := util.Listen() if err != nil { log.Fatal(err) } self = l.Addr().String() log.Println("Listening on", self) go dial(*peerAddr) go readInput() for { c, err := l.Accept() if err != nil { log.Fatal(err) } go serve(c) } } // TODO: create a global shared Peers instance type Peers struct { m map[string]chan<- Message mu sync.RWMutex } // Add creates and returns a new channel for the given peer address. // If an address already exists in the registry, it returns nil. func (p *Peers) Add(addr string) <-chan Message { p.mu.Lock() defer p.mu.Unlock() if _, ok := p.m[addr]; ok { return nil } ch := make(chan Message) p.m[addr] = ch return ch } // Remove deletes the specified peer from the registry. func (p *Peers) Remove(addr string) { p.mu.Lock() defer p.mu.Unlock() delete(p.m, addr) } // List returns a slice of all active peer channels. func (p *Peers) List() []chan<- Message { p.mu.RLock() defer p.mu.RUnlock() l := make([]chan<- Message, 0, len(p.m)) for _, ch := range p.m { l = append(l, ch) } return l } func broadcast(m Message) { for /* TODO: Range over the list of peers */ { // TODO: Send a message to the channel, but don't block. // Hint: Select is your friend. } } func serve(c net.Conn) { defer c.Close() d := json.NewDecoder(c) for { var m Message err := d.Decode(&m) if err != nil { log.Println(err) return } // TODO: Launch dial in a new goroutine, to connect to the address in the message's Addr field. fmt.Printf("%#v\n", m) } } func readInput() { s := bufio.NewScanner(os.Stdin) for s.Scan() { m := Message{ Addr: self, Body: s.Text(), } broadcast(m) } if err := s.Err(); err != nil { log.Fatal(err) } } func dial(addr string) { // TODO: If dialing self, return. // TODO: Add the address to the peers map. // TODO: If you get a nil channel the peer is already connected, return. // TODO: Remove the address from peers map when this function returns // (use defer). c, err := net.Dial("tcp", addr) if err != nil { log.Println(addr, err) return } defer c.Close() e := json.NewEncoder(c) for m := range ch { err := e.Encode(m) if err != nil { log.Println(addr, err) return } } }