Concurrent file sync with Go and MinIO

Synchronizing files from your object storage to your local environment often requires handling many files efficiently. In this DevTip, we demonstrate how to build a simple, concurrent file synchronization tool in Go using MinIO, an open source, S3-compatible cloud storage server.
Set up MinIO server
Before we start coding, let's set up a local MinIO server using Docker. This will give us a development environment to test our implementation:
docker run \
-p 9000:9000 \
-p 9001:9001 \
--name minio \
-v ~/minio/data:/data \
-e "MINIO_ROOT_USER=minioadmin" \
-e "MINIO_ROOT_PASSWORD=minioadmin" \
quay.io/minio/minio server /data --console-address ":9001"
This command starts a MinIO server with the default credentials. You can access the web console at
http://localhost:9001
to manage your buckets and files.
Initialize the project
Create a new Go project and install the MinIO SDK (requires Go 1.21 or later):
go mod init minio-sync
go get github.com/minio/minio-go/v7@v7.0.87
Set up the MinIO client
Create a secure connection to your MinIO instance with proper error handling and TLS configuration:
package main
import (
"context"
"crypto/tls"
"log"
"time"
"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
)
func createMinioClient(ctx context.Context) (*minio.Client, error) {
endpoint := "localhost:9000"
accessKeyID := "minioadmin" // Use environment variables in production
secretAccessKey := "minioadmin" // Use environment variables in production
useSSL := false
// Configure TLS for production use
transport := &http.Transport{
TLSClientConfig: &tls.Config{MinVersion: tls.VersionTLS12},
IdleConnTimeout: 90 * time.Second,
}
// Initialize minio client
opts := &minio.Options{
Creds: credentials.NewStaticV4(accessKeyID, secretAccessKey, ""),
Secure: useSSL,
Transport: transport,
}
client, err := minio.New(endpoint, opts)
if err != nil {
return nil, err
}
// Verify connection
_, err = client.BucketExists(ctx, "test-bucket")
if err != nil {
return nil, err
}
return client, nil
}
Implement concurrent file downloads
Here's an improved implementation with proper error handling, context management, and cleanup:
package main
import (
"context"
"fmt"
"io"
"os"
"path/filepath"
"sync"
"time"
"github.com/minio/minio-go/v7"
)
type DownloadResult struct {
ObjectName string
Error error
}
func downloadFiles(ctx context.Context, client *minio.Client, bucketName string, outputDir string) error {
// Ensure output directory exists
if err := os.MkdirAll(outputDir, 0755); err != nil {
return fmt.Errorf("failed to create output directory: %w", err)
}
// Create buffered channels for work distribution
jobs := make(chan string, 100)
results := make(chan DownloadResult, 100)
// Create worker pool
var wg sync.WaitGroup
workerCount := 5
// Start workers
for i := 0; i < workerCount; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for objectName := range jobs {
err := downloadObject(ctx, client, bucketName, objectName, outputDir)
results <- DownloadResult{ObjectName: objectName, Error: err}
}
}()
}
// Start result collector
go func() {
wg.Wait()
close(results)
}()
// List and queue objects
opts := minio.ListObjectsOptions{Recursive: true}
for obj := range client.ListObjects(ctx, bucketName, opts) {
if obj.Err != nil {
return fmt.Errorf("error listing objects: %w", obj.Err)
}
select {
case jobs <- obj.Key:
case <-ctx.Done():
return ctx.Err()
}
}
close(jobs)
// Process results
var downloadErrors []error
for result := range results {
if result.Error != nil {
downloadErrors = append(downloadErrors, fmt.Errorf("%s: %w", result.ObjectName, result.Error))
}
}
if len(downloadErrors) > 0 {
return fmt.Errorf("encountered %d download errors: %v", len(downloadErrors), downloadErrors)
}
return nil
}
func downloadObject(ctx context.Context, client *minio.Client, bucket, objectName, outputDir string) error {
// Create context with timeout
ctx, cancel := context.WithTimeout(ctx, 10*time.Minute)
defer cancel()
// Get object
obj, err := client.GetObject(ctx, bucket, objectName, minio.GetObjectOptions{})
if err != nil {
return fmt.Errorf("failed to get object: %w", err)
}
defer obj.Close()
// Create output file
outputPath := filepath.Join(outputDir, objectName)
if err := os.MkdirAll(filepath.Dir(outputPath), 0755); err != nil {
return fmt.Errorf("failed to create directories: %w", err)
}
file, err := os.Create(outputPath)
if err != nil {
return fmt.Errorf("failed to create file: %w", err)
}
defer file.Close()
// Copy data with progress monitoring
if _, err := io.Copy(file, obj); err != nil {
return fmt.Errorf("failed to download file: %w", err)
}
return nil
}
Error handling and troubleshooting
Here are common issues you might encounter and how to resolve them:
-
Connection errors:
- Verify the MinIO endpoint is accessible
- Check firewall settings
- Ensure correct credentials
-
Permission issues:
- Verify bucket access rights
- Check file system permissions for output directory
-
Resource constraints:
- Adjust worker count based on system resources
- Monitor memory usage with large files
- Consider implementing rate limiting
Testing your implementation
Create a test file to verify your implementation:
func main() {
// Create context with cancellation
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create MinIO client
client, err := createMinioClient(ctx)
if err != nil {
log.Fatalf("Failed to create MinIO client: %v", err)
}
// Start download
if err := downloadFiles(ctx, client, "your-bucket", "./downloads"); err != nil {
log.Fatalf("Download failed: %v", err)
}
log.Println("Download completed successfully")
}
Security considerations
When deploying to production:
- Use environment variables or a secure configuration manager for credentials
- Enable TLS in production environments
- Implement proper access controls on buckets
- Use temporary credentials when possible
- Regularly rotate access keys
Final thoughts
This implementation provides a robust foundation for concurrent file downloads from MinIO. The code handles errors gracefully, manages resources efficiently, and includes proper cleanup. Remember to adjust the worker count and timeout values based on your specific use case and system resources.
Happy coding!