Frederic G. MARAND 9 years ago
parent
commit
0497f253f2
4 changed files with 355 additions and 1 deletions
  1. 1 1
      part7/main.go
  2. 174 0
      part8/main.go
  3. BIN
      part8/part8
  4. 180 0
      part9/main.go

+ 1 - 1
part7/main.go

@@ -99,7 +99,7 @@ func (p *Peers) List() []chan<- Message {
 	p.mu.RLock()
 
 	// TODO: Declare a slice of chan<- Message.
-	s := make([]chan <- Message, 0)
+	var s []chan <- Message
 
 	/* TODO: Iterate over the map using range */
 	for _, v := range p.m {

+ 174 - 0
part8/main.go

@@ -0,0 +1,174 @@
+// Skeleton to part 8 of the Whispering Gophers code lab.
+//
+// This program extends part 7.
+//
+// It connects to the peer specified by -peer.
+// It accepts connections from peers and receives messages from them.
+// When it sees a peer with an address it hasn't seen before, it opens a
+// connection to that peer.
+//
+package main
+
+import (
+	"bufio"
+	"encoding/json"
+	"flag"
+	"fmt"
+	"log"
+	"net"
+	"os"
+	"sync"
+
+	"code.google.com/p/whispering-gophers/util"
+)
+
+var (
+	peerAddr = flag.String("peer", "", "peer host:port")
+	self     string
+)
+
+type Message struct {
+	Addr string
+	Body string
+}
+
+func main() {
+	flag.Parse()
+
+	l, err := util.Listen()
+	if err != nil {
+		log.Fatal(err)
+	}
+	self = l.Addr().String()
+	log.Println("Listening on", self)
+
+	go dial(*peerAddr)
+	go readInput()
+
+	for {
+		c, err := l.Accept()
+		if err != nil {
+			log.Fatal(err)
+		}
+		go serve(c)
+	}
+}
+
+// TODO: create a global shared Peers instance
+var peers Peers = Peers{m: make(map[string]chan<- Message)}
+
+type Peers struct {
+	m  map[string]chan<- Message
+	mu sync.RWMutex
+}
+
+// Add creates and returns a new channel for the given peer address.
+// If an address already exists in the registry, it returns nil.
+func (p *Peers) Add(addr string) <-chan Message {
+	p.mu.Lock()
+	defer p.mu.Unlock()
+	if _, ok := p.m[addr]; ok {
+		return nil
+	}
+	ch := make(chan Message)
+	p.m[addr] = ch
+	return ch
+}
+
+// Remove deletes the specified peer from the registry.
+func (p *Peers) Remove(addr string) {
+	p.mu.Lock()
+	defer p.mu.Unlock()
+	delete(p.m, addr)
+}
+
+// List returns a slice of all active peer channels.
+func (p *Peers) List() []chan<- Message {
+	p.mu.RLock()
+	defer p.mu.RUnlock()
+	l := make([]chan<- Message, 0, len(p.m))
+	for _, ch := range p.m {
+		l = append(l, ch)
+	}
+	return l
+}
+
+func broadcast(m Message) {
+	/* TODO: Range over the list of peers */
+	for k, v := range peers.m {
+		// TODO: Send a message to the channel, but don't block.
+		// Hint: Select is your friend.
+		select {
+		case v <- m:
+			fmt.Printf("Sent %+v to %s", m, k)
+		default:
+			fmt.Printf("Failed sending to %s\n", k)
+		}
+	}
+}
+
+func serve(c net.Conn) {
+	defer c.Close()
+	d := json.NewDecoder(c)
+	for {
+		var m Message
+		err := d.Decode(&m)
+		if err != nil {
+			log.Println(err)
+			return
+		}
+
+		// TODO: Launch dial in a new goroutine, to connect to the address in the message's Addr field.
+		go dial(m.Addr)
+		fmt.Printf("%#v\n", m)
+	}
+}
+
+func readInput() {
+	s := bufio.NewScanner(os.Stdin)
+	for s.Scan() {
+		m := Message{
+			Addr: self,
+			Body: s.Text(),
+		}
+		broadcast(m)
+	}
+	if err := s.Err(); err != nil {
+		log.Fatal(err)
+	}
+}
+
+func dial(addr string) {
+	// TODO: If dialing self, return.
+	if addr == self {
+		return
+	}
+
+	// TODO: Add the address to the peers map.
+	ch := peers.Add(addr)
+
+	// TODO: If you get a nil channel the peer is already connected, return.
+	if ch == nil {
+		return
+	}
+
+	// TODO: Remove the address from peers map when this function returns
+	//       (use defer).
+	defer peers.Remove(addr)
+
+	c, err := net.Dial("tcp", addr)
+	if err != nil {
+		log.Println(addr, err)
+		return
+	}
+	defer c.Close()
+
+	e := json.NewEncoder(c)
+	for m := range ch {
+		err := e.Encode(m)
+		if err != nil {
+			log.Println(addr, err)
+			return
+		}
+	}
+}

BIN
part8/part8


+ 180 - 0
part9/main.go

@@ -0,0 +1,180 @@
+// Skeleton to part 9 of the Whispering Gophers code lab.
+//
+// This program extends part 8.
+//
+// It connects to the peer specified by -peer.
+// It accepts connections from peers and receives messages from them.
+// When it sees a peer with an address it hasn't seen before, it makes a
+// connection to that peer.
+// It adds an ID field containing a random string to each outgoing message.
+// When it recevies a message with an ID it hasn't seen before, it broadcasts
+// that message to all connected peers.
+//
+package main
+
+import (
+	"bufio"
+	"encoding/json"
+	"flag"
+	"fmt"
+	"log"
+	"net"
+	"os"
+	"sync"
+
+	"code.google.com/p/whispering-gophers/util"
+)
+
+var (
+	peerAddr = flag.String("peer", "", "peer host:port")
+	self     string
+)
+
+type Message struct {
+	ID   string
+	Addr string
+	Body string
+}
+
+func main() {
+	flag.Parse()
+
+	l, err := util.Listen()
+	if err != nil {
+		log.Fatal(err)
+	}
+	self = l.Addr().String()
+	log.Println("Listening on", self)
+
+	go dial(*peerAddr)
+	go readInput()
+
+	for {
+		c, err := l.Accept()
+		if err != nil {
+			log.Fatal(err)
+		}
+		go serve(c)
+	}
+}
+
+var peers = &Peers{m: make(map[string]chan<- Message)}
+
+type Peers struct {
+	m  map[string]chan<- Message
+	mu sync.RWMutex
+}
+
+// Add creates and returns a new channel for the given peer address.
+// If an address already exists in the registry, it returns nil.
+func (p *Peers) Add(addr string) <-chan Message {
+	p.mu.Lock()
+	defer p.mu.Unlock()
+	if _, ok := p.m[addr]; ok {
+		return nil
+	}
+	ch := make(chan Message)
+	p.m[addr] = ch
+	return ch
+}
+
+// Remove deletes the specified peer from the registry.
+func (p *Peers) Remove(addr string) {
+	p.mu.Lock()
+	defer p.mu.Unlock()
+	delete(p.m, addr)
+}
+
+// List returns a slice of all active peer channels.
+func (p *Peers) List() []chan<- Message {
+	p.mu.RLock()
+	defer p.mu.RUnlock()
+	l := make([]chan<- Message, 0, len(p.m))
+	for _, ch := range p.m {
+		l = append(l, ch)
+	}
+	return l
+}
+
+func broadcast(m Message) {
+	for _, ch := range peers.List() {
+		select {
+		case ch <- m:
+		default:
+			// Okay to drop messages sometimes.
+		}
+	}
+}
+
+func serve(c net.Conn) {
+	defer c.Close()
+	d := json.NewDecoder(c)
+	for {
+		var m Message
+		err := d.Decode(&m)
+		if err != nil {
+			log.Println(err)
+			return
+		}
+
+		// TODO: If this message has seen before, ignore it.
+
+		fmt.Printf("%#v\n", m)
+		broadcast(m)
+		go dial(m.Addr)
+	}
+}
+
+func readInput() {
+	s := bufio.NewScanner(os.Stdin)
+	for s.Scan() {
+		m := Message{
+			ID:   util.RandomID(),
+			Addr: self,
+			Body: s.Text(),
+		}
+		// TODO: Mark the message ID as seen.
+		broadcast(m)
+	}
+	if err := s.Err(); err != nil {
+		log.Fatal(err)
+	}
+}
+
+func dial(addr string) {
+	if addr == self {
+		return // Don't try to dial self.
+	}
+
+	ch := peers.Add(addr)
+	if ch == nil {
+		return // Peer already connected.
+	}
+	defer peers.Remove(addr)
+
+	c, err := net.Dial("tcp", addr)
+	if err != nil {
+		log.Println(addr, err)
+		return
+	}
+	defer c.Close()
+
+	e := json.NewEncoder(c)
+	for m := range ch {
+		err := e.Encode(m)
+		if err != nil {
+			log.Println(addr, err)
+			return
+		}
+	}
+}
+
+// TODO: Create a new map of seen message IDs and a mutex to protect it.
+
+// Seen returns true if the specified id has been seen before.
+// If not, it returns false and marks the given id as "seen".
+func Seen(id string) bool {
+	// TODO: Get a write lock on the seen message IDs map and unlock it at before returning.
+	// TODO: Check if the id has been seen before and return that later.
+	// TODO: Mark the ID as seen in the map.
+}