// Skeleton to part 9 of the Whispering Gophers code lab. // // This program extends part 8. // // It connects to the peer specified by -peer. // It accepts connections from peers and receives messages from them. // When it sees a peer with an address it hasn't seen before, it makes a // connection to that peer. // It adds an ID field containing a random string to each outgoing message. // When it receives a message with an ID it hasn't seen before, it broadcasts // that message to all connected peers. // package main import ( "bufio" "encoding/json" "flag" "fmt" "log" "net" "os" "sync" "code.osinet.fr/fgm/whispering_gophers/util" ) var ( listenAddr = flag.String("listen", "", "peer host:port") peerAddr = flag.String("peer", "", "peer host:port") self string ) type Message struct { // Add ID field ID string Addr string Body string } func main() { flag.Parse() var l net.Listener var err error // Create a new listener using util.Listen and put it in a variable named l. if *listenAddr == "" { l, err = util.ListenOnFirstUsableInterface() } else { l, err = net.Listen("tcp4", *listenAddr) } if err != nil { log.Fatal(err) } self = l.Addr().String() log.Println("Listening on", self) go dial(*peerAddr) go readInput() for { c, err := l.Accept() if err != nil { log.Fatal(err) } go serve(c) } } var peers = &Peers{m: make(map[string]chan<- Message)} type Peers struct { m map[string]chan<- Message mu sync.RWMutex } // Add creates and returns a new channel for the given peer address. // If an address already exists in the registry, it returns nil. func (p *Peers) Add(addr string) <-chan Message { p.mu.Lock() defer p.mu.Unlock() if _, ok := p.m[addr]; ok { return nil } ch := make(chan Message) p.m[addr] = ch return ch } // Remove deletes the specified peer from the registry. func (p *Peers) Remove(addr string) { p.mu.Lock() defer p.mu.Unlock() delete(p.m, addr) } // List returns a slice of all active peer channels. func (p *Peers) List() []chan<- Message { p.mu.RLock() defer p.mu.RUnlock() l := make([]chan<- Message, 0, len(p.m)) for _, ch := range p.m { l = append(l, ch) } return l } func broadcast(m Message) { for _, ch := range peers.List() { select { case ch <- m: default: // Okay to drop messages sometimes. } } } func serve(c net.Conn) { defer c.Close() d := json.NewDecoder(c) for { var m Message err := d.Decode(&m) if err != nil { log.Println(err) return } // If this message has seen before, ignore it. if Seen(m.ID) { continue } fmt.Printf("%#v\n", m) broadcast(m) go dial(m.Addr) } } func readInput() { s := bufio.NewScanner(os.Stdin) for s.Scan() { m := Message{ // Use util.RandomID to populate the ID field. Addr: self, Body: s.Text(), ID: util.RandomID(), } Seen(m.ID) broadcast(m) } if err := s.Err(); err != nil { log.Fatal(err) } os.Exit(0) } func dial(addr string) { if addr == self { return // Don't try to dial self. } ch := peers.Add(addr) if ch == nil { return // Peer already connected. } defer peers.Remove(addr) c, err := net.Dial("tcp", addr) if err != nil { log.Println(addr, err) return } defer c.Close() e := json.NewEncoder(c) for m := range ch { err := e.Encode(m) if err != nil { log.Println(addr, err) return } } } // Create a new map of seen message IDs and a mutex to protect it. var seenIDs = struct { m map[string]bool sync.Mutex }{m: make(map[string]bool)} // Seen returns true if the specified id has been seen before. // If not, it returns false and marks the given id as "seen". func Seen(id string) bool { // Get a write lock on the seen message IDs map and unlock it at before returning. seenIDs.Lock() defer seenIDs.Unlock() // Check if the id has been seen before and return that later. ok := seenIDs.m[id] // Mark the ID as seen in the map. seenIDs.m[id] = true return ok }