Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 35 additions & 15 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,37 +63,57 @@ func validateOrders(receivedOrdersChannel <-chan model.Order) (<-chan model.Orde
return validOrdersChannel, inValidOrdersChannel
}

// multiple producer
func reserverInventory(in <-chan model.Order) <-chan model.Order {

out := make(chan model.Order)
workers := 4
var wg sync.WaitGroup
wg.Add(workers)

for i := 0; i < workers; i++ {
go func() {
defer wg.Done()

for order := range in {
order.Status = model.Reserved
out <- order
}
}()
}

go func() {
for order := range in {
order.Status = model.Reserved
out <- order
}
wg.Wait()
close(out)
}()

return out
}

// multiple consumer
func fillOrders(in <-chan model.Order, wg *sync.WaitGroup) {
workers := 4
wg.Add(workers)

for i := 0; i < workers; i++ {
go func() {
defer wg.Done()
for order := range in {
order.Status = model.Filled
fmt.Printf("Order has been completed -> %v \n", order.String())
}
}()
}
}

func main() {
var wg sync.WaitGroup
wg.Add(1)

receivedOrdersChannel := receiveOrders()
validOrdersChannel, inValidOrdersChannel := validateOrders(receivedOrdersChannel)
reservedOrdersChannel := reserverInventory(validOrdersChannel)

var wg sync.WaitGroup
wg.Add(2)

go func(reservedOrderChannel <-chan model.Order) {
defer wg.Done()

for reservedOrder := range reservedOrdersChannel {
fmt.Printf("Inventory recived for order -> %v \n", reservedOrder.String())
}
}(receivedOrdersChannel)
fillOrders(reservedOrdersChannel, &wg)

go func(invalidOrdersChannel <-chan model.InvalidOrder) {
defer wg.Done()
Expand Down