// Solution to part 9 of the Whispering Gophers code lab. // // This program extends part 9. // // It connects to the peer specified by -peer. // It accepts connections from peers and receives messages from them. // When it sees a peer with an address it hasn't seen before, it makes a // connection to that peer. // It adds an ID field containing a random string to each outgoing message. // When it recevies a message with an ID it hasn't seen before, it broadcasts // that message to all connected peers. // package main import ( "bufio" "encoding/json" "flag" "fmt" "log" "net" "os" "sync" "code.osinet.fr/fgm/whispering_gophers/util" ) var ( selfAddr = flag.String("self", "", "self host") peerAddr = flag.String("peer", "", "peer host:port") self string ) type Message struct { ID string Addr string Body string } func main() { flag.Parse() var l net.Listener var err error if *selfAddr != "" { l, err = net.Listen("tcp4", *selfAddr + ":0") } else { l, err = util.ListenOnFirstUsableInterface() } if err != nil { log.Fatal(err) } self = l.Addr().String() log.Println("Listening on", self) go dial(*peerAddr) go readInput() for { c, err := l.Accept() if err != nil { log.Fatal(err) } go serve(c) } } var peers = &Peers{m: make(map[string]chan<- Message)} type Peers struct { m map[string]chan<- Message mu sync.RWMutex } // Add creates and returns a new channel for the given peer address. // If an address already exists in the registry, it returns nil. func (p *Peers) Add(addr string) <-chan Message { p.mu.Lock() defer p.mu.Unlock() if _, ok := p.m[addr]; ok { return nil } ch := make(chan Message) p.m[addr] = ch return ch } // Remove deletes the specified peer from the registry. func (p *Peers) Remove(addr string) { p.mu.Lock() defer p.mu.Unlock() delete(p.m, addr) } // List returns a slice of all active peer channels. func (p *Peers) List() []chan<- Message { p.mu.RLock() defer p.mu.RUnlock() l := make([]chan<- Message, 0, len(p.m)) for _, ch := range p.m { l = append(l, ch) } return l } func broadcast(m Message) { for _, ch := range peers.List() { select { case ch <- m: default: // Okay to drop messages sometimes. } } } func serve(c net.Conn) { defer c.Close() d := json.NewDecoder(c) for { var m Message err := d.Decode(&m) if err != nil { log.Println(err) return } if Seen(m.ID) { continue } fmt.Printf("%#v\n", m) broadcast(m) go dial(m.Addr) } } func readInput() { s := bufio.NewScanner(os.Stdin) for s.Scan() { m := Message{ ID: util.RandomID(), Addr: self, Body: s.Text(), } Seen(m.ID) broadcast(m) } if err := s.Err(); err != nil { log.Fatal(err) } os.Exit(0) } func dial(addr string) { if addr == self { return // Don't try to dial self. } ch := peers.Add(addr) if ch == nil { return // Peer already connected. } defer peers.Remove(addr) c, err := net.Dial("tcp", addr) if err != nil { log.Println(addr, err) return } defer c.Close() e := json.NewEncoder(c) for m := range ch { err := e.Encode(m) if err != nil { log.Println(addr, err) return } } } var seenIDs = struct { m map[string]bool sync.Mutex }{m: make(map[string]bool)} // Seen returns true if the specified id has been seen before. // If not, it returns false and marks the given id as "seen". func Seen(id string) bool { seenIDs.Lock() ok := seenIDs.m[id] seenIDs.m[id] = true seenIDs.Unlock() return ok }