Agent SkillsAgent Skills
anhvth

multi-threading-processing

@anhvth/multi-threading-processing
anhvth
7
0 forks
Updated 5/6/2026
View on GitHub

Comprehensive guide for using multi-threading and multi-processing in Python, including when to choose each approach, best practices, and practical examples using the speedy_utils library.

Installation

$npx agent-skills-cli install @anhvth/multi-threading-processing
Claude Code
Cursor
Copilot
Codex
Antigravity

Details

Path.github/skills/multi-threading-processing/SKILL.md
Branchmaster
Scoped Name@anhvth/multi-threading-processing

Usage

After installing, this skill will be available to your AI coding assistant.

Verify installation:

npx agent-skills-cli list

Skill Instructions


name: 'multi-threading-processing' description: 'Comprehensive guide for using multi-threading and multi-processing in Python, including when to choose each approach, best practices, and practical examples using the speedy_utils library.'

Multi-Threading and Multi-Processing Guide

This skill provides comprehensive guidance for implementing concurrent and parallel processing in Python applications. It covers when to use multi-threading vs multi-processing, best practices, performance considerations, and practical examples using the speedy_utils library's multi-worker implementations.

When to Use Multi-Threading vs Multi-Processing

Multi-Threading (Use for I/O-bound tasks)

Best for:

  • Network requests (HTTP, database queries, API calls)
  • File I/O operations (reading/writing files)
  • GUI applications that need to remain responsive
  • Tasks waiting for external resources

When to choose threading:

# I/O-bound example: Downloading multiple files
import requests
from speedy_utils import multi_thread

def download_file(url):
    response = requests.get(url)
    return len(response.content)

urls = ['https://example.com/file1', 'https://example.com/file2']
results = multi_thread(download_file, urls, workers=10)

Multi-Processing (Use for CPU-bound tasks)

Best for:

  • CPU-intensive computations (mathematical calculations, data processing)
  • Image/video processing
  • Machine learning model training/inference
  • Scientific computing

When to choose processing:

# CPU-bound example: Heavy computation
from speedy_utils import multi_process

def fibonacci(n):
    if n <= 1:
        return n
    return fibonacci(n - 1) + fibonacci(n - 2)

numbers = list(range(30, 35))
results = multi_process(fibonacci, numbers, workers=4)

Key Decision Factors

FactorMulti-ThreadingMulti-Processing
GIL ImpactLimited by GILBypasses GIL
MemoryShared memorySeparate memory spaces
CommunicationEasy (shared variables)Complex (pickling required)
OverheadLowHigh (process creation)
ScalabilityLimited by coresScales with cores
DebuggingEasierMore complex

Best Practices

1. Choose the Right Backend

The speedy_utils library provides multiple backends:

from speedy_utils import multi_process

# For I/O-bound tasks
results = multi_process(func, items, backend='threadpool')

# For CPU-bound tasks
results = multi_process(func, items, backend='mp')  # multiprocessing

# For distributed computing (if Ray available)
results = multi_process(func, items, backend='ray')

2. Optimize Worker Count

import os
from speedy_utils import multi_thread

# For threading: Use more workers than CPU cores
cpu_count = os.cpu_count() or 4
thread_workers = cpu_count * 2  # Good starting point

# For processing: Match CPU cores (or slightly less)
process_workers = max(1, cpu_count - 1)

results = multi_thread(func, items, workers=thread_workers)

3. Handle Errors Gracefully

# Continue processing even if some tasks fail
results = multi_thread(
    func,
    items,
    stop_on_error=False,  # Don't abort on first error
    workers=4
)

# Check for None values (failed tasks)
successful_results = [r for r in results if r is not None]
failed_count = results.count(None)

4. Use Batching for Small Tasks

# For many small tasks, batch them to reduce overhead
def process_batch(items):
    return [item * 2 for item in items]

results = multi_thread(
    process_batch,
    items,
    batch=10,  # Process 10 items per task
    workers=4
)

5. Monitor and Clean Up Resources

from speedy_utils import kill_all_thread, cleanup_phantom_workers

# Force cleanup of stuck threads/processes
killed_threads = kill_all_thread()
killed_processes = cleanup_phantom_workers()

Performance Considerations

Memory Usage

Threading: Shared memory, lower overhead

# Threads share memory - efficient for large datasets
large_data = load_large_dataset()
results = multi_thread(process_item, large_data, workers=8)

Processing: Separate memory spaces, higher overhead

# Each process gets a copy - use for CPU-bound work
results = multi_process(cpu_intensive_func, data, workers=4)

Communication Between Workers

Threading: Easy communication

shared_results = []
lock = threading.Lock()

def thread_safe_append(item):
    with lock:
        shared_results.append(process_item(item))

multi_thread(thread_safe_append, items, workers=4)

Processing: Requires serialization

# Use return values or shared storage
def process_and_return(item):
    result = heavy_computation(item)
    return result  # Pickled and sent back

results = multi_process(process_and_return, items, workers=4)

Common Patterns and Examples

Pattern 1: Parallel Data Processing

from speedy_utils import multi_thread
import pandas as pd

def process_dataframe_chunk(chunk):
    # Process a chunk of data
    return chunk['value'].sum()

# Split large dataframe into chunks
df = pd.read_csv('large_file.csv')
chunks = [df[i:i+1000] for i in range(0, len(df), 1000)]

results = multi_thread(process_dataframe_chunk, chunks, workers=8)
total = sum(results)

Pattern 2: Concurrent API Calls

import aiohttp
from speedy_utils import multi_thread

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.text()

def fetch_multiple_urls(urls):
    async def main():
        async with aiohttp.ClientSession() as session:
            tasks = [fetch_url(session, url) for url in urls]
            return await asyncio.gather(*tasks)

    return asyncio.run(main())

results = multi_thread(fetch_multiple_urls, [urls], workers=1)  # Single async task

Pattern 3: CPU-Intensive Computation

from speedy_utils import multi_process
from PIL import Image
import numpy as np

def process_image(image_path):
    img = Image.open(image_path)
    # Heavy image processing
    processed = np.array(img) ** 2  # Example computation
    return processed.mean()

image_paths = ['image1.jpg', 'image2.jpg', ...]
results = multi_process(process_image, image_paths, workers=4)

Error Handling and Debugging

Custom Exception Handling

from speedy_utils import multi_thread

def risky_operation(x):
    if x == 42:
        raise ValueError(f"Bad input: {x}")
    return x * 2

# Catch and handle errors
try:
    results = multi_thread(risky_operation, range(100), stop_on_error=True)
except Exception as e:
    print(f"Processing failed: {e}")
    # Handle error appropriately

Logging and Monitoring

import logging
from speedy_utils import multi_thread

logging.basicConfig(level=logging.INFO)

def monitored_task(item):
    logging.info(f"Processing item: {item}")
    result = process_item(item)
    logging.info(f"Completed item: {item}")
    return result

results = multi_thread(monitored_task, items, workers=4)

Integration with Other Libraries

With tqdm for Progress Bars

from speedy_utils import multi_thread

def slow_task(x):
    time.sleep(0.1)  # Simulate work
    return x ** 2

# Automatic progress bar
results = multi_thread(slow_task, range(100), progress=True)

With Ray for Distributed Computing

from speedy_utils import multi_process

# Automatically uses Ray if available
results = multi_process(
    heavy_computation,
    large_dataset,
    backend='ray',
    workers=16  # Can exceed local cores
)

With Pandas

import pandas as pd
from speedy_utils import multi_thread

def process_row(row):
    # Process a single row
    return row['value'] * 2

df = pd.read_csv('data.csv')

# Process rows in parallel
results = multi_thread(process_row, df.to_dict('records'), workers=8)
processed_df = pd.DataFrame(results)

Performance Benchmarking

Compare Different Approaches

import time
from speedy_utils import multi_thread, multi_process

def benchmark_func(x):
    # Some computation
    return sum(i**2 for i in range(x))

data = list(range(100, 200))

# Benchmark threading
start = time.time()
thread_results = multi_thread(benchmark_func, data, workers=8, progress=False)
thread_time = time.time() - start

# Benchmark processing
start = time.time()
process_results = multi_process(benchmark_func, data, workers=4, progress=False, backend='safe')
process_time = time.time() - start

# Benchmark sequential
start = time.time()
sequential_results = [benchmark_func(x) for x in data]
sequential_time = time.time() - start

print(f"Threading: {thread_time:.2f}s")
print(f"Processing: {process_time:.2f}s")
print(f"Sequential: {sequential_time:.2f}s")

Troubleshooting Common Issues

1. GIL Limitations

Problem: Python's GIL prevents true parallel execution in threads for CPU-bound tasks.

Solution: Use multi-processing for CPU-bound work.

2. Pickle Errors

Problem: Functions/objects can't be pickled for inter-process communication.

Solution: Define functions at module level, avoid lambdas.

# Good: Module-level function
def process_item(x):
    return x * 2

# Bad: Lambda
lambda x: x * 2  # Can't be pickled

3. Memory Issues

Problem: Large datasets copied to each process.

Solution: Use threading for large data, or lazy loading.

4. Deadlocks

Problem: Threads/processes waiting indefinitely.

Solution: Use timeouts and proper cleanup.

results = multi_thread(
    func,
    items,
    timeout=300,  # 5 minute timeout
    workers=4
)

Advanced Topics

Custom Worker Pools

from concurrent.futures import ThreadPoolExecutor
from speedy_utils.multi_worker.thread import _track_executor_threads

def custom_processing(items, workers=4):
    with ThreadPoolExecutor(max_workers=workers) as executor:
        _track_executor_threads(executor)  # Track for cleanup
        futures = [executor.submit(func, item) for item in items]
        return [f.result() for f in futures]

Async Integration

import asyncio
from speedy_utils import multi_thread

async def async_task(x):
    await asyncio.sleep(0.1)
    return x * 2

def run_async_tasks(items):
    async def main():
        return await asyncio.gather(*[async_task(x) for x in items])

    return asyncio.run(main())

# Run multiple async batches in parallel
batches = [items[i:i+10] for i in range(0, len(items), 10)]
results = multi_thread(run_async_tasks, batches, workers=4)

Large-Scale Dataset Processing

For processing large HuggingFace datasets or other big data, multi_process particularly shines when combined with data sharding patterns. See the dedicated dataset-processing-multiprocessing skill for:

  • Distributing datasets across workers via row ranges
  • Worker function architecture (imports, setup, cleanup)
  • Temporary file management during transformation
  • Integration with tokenizers, Megatron, format converters
  • Error resilience and graceful degradation
  • Merging partial results after parallel processing

Quick Example:

from speedy_utils import multi_process
from datasets import load_from_disk

def process_shard(args):
    shard_id, start_idx, end_idx, dataset_path = args
    ds = load_from_disk(dataset_path)
    # Process slice [start_idx:end_idx]
    transformed = [transform(ds[i]) for i in range(start_idx, end_idx)]
    # Save result to temp location
    return save_shard(transformed, shard_id)

# Distribute 100k rows across 4 workers
num_records = 100000
num_workers = 4
rows_per_worker = num_records // num_workers

worker_args = [
    (i, i*rows_per_worker, (i+1)*rows_per_worker, 'path/to/dataset')
    for i in range(num_workers)
]

shard_results = multi_process(process_shard, worker_args, workers=num_workers)
# Merge shard_results to get final dataset

References

This skill leverages the speedy_utils library's multi-worker implementations:

  • speedy_utils.multi_thread() - Advanced threading with batching, progress bars, and error handling
  • speedy_utils.multi_process() - Multi-processing with multiple backends (Ray, multiprocessing, threading)
  • speedy_utils.kill_all_thread() - Emergency thread cleanup
  • speedy_utils.cleanup_phantom_workers() - Process cleanup utilities

See the library's test files for additional examples and edge cases.