worker pool

启动固定数量的工作协程(workers),把任务分配给这些协程处理

使用worker pool实现md5计算

package main

import (
	"crypto/md5"
	"encoding/json"
	"fmt"
	"io"
	"os"
	"path/filepath"
	"runtime"
	"sync"
)

var numWorkers = runtime.NumCPU() // 使用CPU核心数作为workers数

type FileTask struct {
	Path string `json:"path"`
}

type FileResult struct {
	Path string `json:"path,omitempty"`
	MD5  string `json:"md5,omitempty"`
}

func worker(id int, tasks <-chan FileTask, results chan<- FileResult, errChan chan<- error) {
	for task := range tasks {
		fmt.Printf("worker %d: processing %s\n", id, task.Path)
		file, err := os.Open(task.Path)
		if err != nil {
			errChan <- err
			continue
		}
		defer file.Close()
		hash := md5.New()
		if _, err := io.Copy(hash, file); err != nil {
			errChan <- err
			continue
		}
		hashInBytes := hash.Sum(nil)
		md5String := fmt.Sprintf("%x", hashInBytes)
		results <- FileResult{
			Path: task.Path,
			MD5:  md5String,
		}
	}
}

func sendTasks(dir string, tasks chan<- FileTask, errChan chan<- error) {
	err := filepath.Walk(dir, func(path string, info os.FileInfo, err error) error {
		if err != nil {
			return err
		}
		if !info.IsDir() {
			tasks <- FileTask{
				Path: path,
			}
			fmt.Printf("发送任务: %s\n", path)
		}
		return nil
	})
	if err != nil {
		errChan <- err
	}
	close(tasks)
}

func processErr(errChan <-chan error, done chan<- struct{}) {
	for err := range errChan {
		fmt.Println("发现错误: ", err)
	}
	done <- struct{}{} // 错误处理完成后发出信号
}

func outputResults(results <-chan FileResult, errChan chan<- error, done chan<- struct{}) {
	var finalResults []FileResult = make([]FileResult, 0)

	for result := range results {
		finalResults = append(finalResults, result)
	}
	bytes, err := json.MarshalIndent(finalResults, "", "\t")
	if err != nil {
		errChan <- err
	}
	fmt.Println(string(bytes))
	done <- struct{}{} // 结果处理完成后发出信号
}

func main() {
	dir := ".."

	bufferSize := numWorkers * 2 // 缓冲区是workers数的2倍

	tasks := make(chan FileTask, bufferSize)
	results := make(chan FileResult, bufferSize)
	errChan := make(chan error, bufferSize)
	// 创建同步通道
	resultsDone := make(chan struct{})
	errorsDone := make(chan struct{})

	var wg sync.WaitGroup

	// 启动 workers
	for w := 1; w <= numWorkers; w++ {
		wg.Add(1)
		go func(id int) {
			defer wg.Done()
			worker(id, tasks, results, errChan)
		}(w)
	}

	// 启动任务发送
	go sendTasks(dir, tasks, errChan)

	// 启动结果处理
	go outputResults(results, errChan, resultsDone)

	// 启动错误处理
	go processErr(errChan, errorsDone)

	// 等待所有 worker 完成
	go func() {
		wg.Wait()
		close(results) // worker 都结束了,可以关闭 results 通道
		close(errChan) // worker 都结束了,可以关闭 errChan 通道
	}()

	// 等待结果处理和错误处理完成
	<-resultsDone
	<-errorsDone

	fmt.Println("所有文件处理完成喵~")
}


使用worker pool下载图片

package main

import (
	"fmt"
	"io"
	"log"
	"net/http"
	"os"
	"path/filepath"
	"strings"
	"sync"
)

//并发下载器
//描述:编写一个程序,能同时从多个URL下载文件。每个下载任务用一个goroutine处理,使用sync.WaitGroup等待所有下载完成。
//技能点:goroutine、sync.WaitGroup、HTTP请求、错误处理。

var wg sync.WaitGroup
var uploadDir = "uploads"

func downloadFile(url string) {

	res, err := http.Get(url)
	if err != nil {
		log.Fatalln("创建请求失败:", err.Error())
	}
	defer res.Body.Close()
	if res.StatusCode != 200 {
		log.Fatalln("响应错误")
	}
	contentType := res.Header.Get("Content-Type")
	if contentType == "" {
		log.Fatalln("不存在content-type")
	}
	urlPath := res.Request.URL.String()
	lastIndex := strings.LastIndex(urlPath, "/")
	fileName := urlPath[lastIndex+1:]
	if os.MkdirAll(uploadDir, 0777) != nil {
		log.Fatal(err)
	}
	filePath := filepath.Join(uploadDir, fileName)

	fd, err := os.Create(filePath)
	if err != nil {
		log.Fatal(err)
	}
	defer fd.Close()
	_, err = io.Copy(fd, res.Body)
	if err != nil {
		log.Fatalln("写入文件失败: ", err.Error())
	}
	fmt.Println("文件保存在: ", filePath)

}

func worker(id int, tasks <-chan string) {
	defer wg.Done()
	for url := range tasks {
		downloadFile(url)
		log.Println("worker id: ", id, " download file successfully")
	}
}
func produce(tasks chan<- string, urls []string, times int) {
	for range times {
		for _, url := range urls {
			tasks <- url
		}
	}
	close(tasks)
}
func main() {
	arr := []string{
		"https://imgapi.xl0408.top/index.php",
		"https://www.dmoe.cc/random.php",
		"https://img.paulzzh.com/touhou/random",
	}
	tasks := make(chan string, 5)

	numWorkers := 20
	wg.Add(numWorkers)
	for i := 0; i < numWorkers; i++ {
		go worker(i, tasks)
	}
	go produce(tasks, arr, 100)
	wg.Wait()

}


package main

import (
	"fmt"
	"io"
	"net/http"
	"os"
	"path"
	"strings"
	"sync"
	"time"
)

// 全局HTTP客户端(带连接池优化喵)
var client = &http.Client{
	Transport: &http.Transport{
		MaxIdleConns:        50,
		MaxIdleConnsPerHost: 10,
		IdleConnTimeout:     30 * time.Second,
	},
	Timeout: 30 * time.Second,
}

// 带进度条的Reader喵~
type ProgressReader struct {
	io.Reader
	Total      int64
	Downloaded int64
	OnProgress func(percent int)
}

func (pr *ProgressReader) Read(p []byte) (n int, err error) {
	n, err = pr.Reader.Read(p)
	pr.Downloaded += int64(n)
	if pr.OnProgress != nil && pr.Total > 0 {
		percent := int(float64(pr.Downloaded) / float64(pr.Total) * 100)
		pr.OnProgress(percent)
	}
	return
}

// 下载并保存图片的核心函数喵
func downloadAndSave(imgURL string, saveDir string) error {
	if _, err := os.Stat(saveDir); os.IsNotExist(err) {
		os.MkdirAll(saveDir, os.ModePerm)
	}
	// 1. 发起请求喵
	resp, err := client.Get(imgURL)
	if err != nil {
		return fmt.Errorf("请求失败喵: %v", err)
	}
	defer resp.Body.Close()

	// 2. 检查状态码喵
	if resp.StatusCode != http.StatusOK {
		return fmt.Errorf("收到奇怪的状态码喵: %d", resp.StatusCode)
	}

	url := resp.Request.URL.String()
	lastIndex := strings.LastIndex(url, "/")
	fileName := url[lastIndex+1:]

	filepath := path.Join(saveDir, fileName)

	// 4. 创建文件喵
	file, err := os.Create(filepath)
	if err != nil {
		return fmt.Errorf("创建文件失败喵: %v", err)
	}
	defer file.Close()

	// 5. 带进度条的下载喵~
	progressReader := &ProgressReader{
		Reader:     resp.Body,
		Total:      resp.ContentLength,
		OnProgress: func(percent int) { fmt.Printf("\r下载进度: %d%%", percent) },
	}

	if _, err := io.Copy(file, progressReader); err != nil {
		return fmt.Errorf("写入文件失败喵: %v", err)
	}
	fmt.Println() // 换行喵

	return nil
}

// Worker函数喵~
func downloadWorker(id int, jobs <-chan string, wg *sync.WaitGroup, saveDir string) {
	defer wg.Done()
	for imgURL := range jobs {
		fmt.Printf("[Worker%d] 开始处理 %s 喵~\n", id, imgURL)

		// 重试机制喵
		var err error
		for retry := 0; retry < 3; retry++ {
			err = downloadAndSave(imgURL, saveDir)
			if err == nil {
				fmt.Printf("[Worker%d] 下载成功喵!(◕‿◕✿)\n", id)
				break
			}
			fmt.Printf("[Worker%d] 第%d次重试喵...\n", id, retry+1)
			time.Sleep(1 * time.Second)
		}

		if err != nil {
			fmt.Printf("[Worker%d] 最终失败喵: %v\n", id, err)
		}
	}
}
func main() {
	jobs := make(chan string, 100) // 带缓冲通道喵
	var wg sync.WaitGroup

	// 先启动worker喵~
	for w := 1; w <= 5; w++ {
		wg.Add(1)
		go downloadWorker(w, jobs, &wg, "./images")
	}

	// 单独协程派发任务喵(摇尾巴)
	go func() {
		for i := 0; i < 100; i++ {
			url := "https://imgapi.xl0408.top/index.php"
			jobs <- url
			time.Sleep(100 * time.Millisecond) // 控制派发速度喵
		}
		close(jobs) // 重要喵!不然worker会永远阻塞喵!
	}()

	wg.Wait() // 主线程只需等待喵
	fmt.Println("所有任务完成喵!ヽ(✿゚▽゚)ノ")
}

worker pool计算平方

package main

import (
	"fmt"
	"runtime"
	"sync"
)

type Task struct {
	Number int
}
type Result struct {
	Number    int
	CalResult int
}

func createTasks(nums []int, tasks chan<- Task) {
	for _, num := range nums {
		tasks <- Task{Number: num}
	}
	close(tasks)
}
func worker(id int, tasks <-chan Task, results chan<- Result) {
	fmt.Println("worker ", id, " calculating task! ")
	for task := range tasks {
		num := task.Number
		results <- Result{Number: num, CalResult: num * num}
	}
}
func main() {
	wg := sync.WaitGroup{}
	numWorkers := runtime.NumCPU()
	nums := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
	results := make(chan Result, len(nums))
	tasks := make(chan Task, len(nums))
	go createTasks(nums, tasks)
	for i := 0; i < numWorkers; i++ {
		wg.Add(1)
		go func() {
			defer wg.Done()
			worker(i, tasks, results)
		}()
	}

	wg.Wait()
	close(results)
	for res := range results {
		fmt.Println(res)
	}
}