Skip to main content

collections.deque — Double-Ended Queue

📚 Official Documentation & Resources

Overview

collections.deque (pronounced "deck") is a double-ended queue that provides O(1) append and pop operations from both ends. It's implemented as a doubly-linked list and is thread-safe, making it ideal for scenarios requiring fast insertion/deletion at both ends, such as implementing queues, stacks, and sliding window algorithms.

🎯 Key Characteristics

  • Double-Ended Operations - Fast append/pop from both left and right
  • O(1) Time Complexity - Constant time for end operations
  • Thread-Safe - Atomic append and pop operations
  • Memory Efficient - Block-based storage with minimal overhead
  • Optional Maxlen - Built-in size limitation with automatic eviction
  • Iterable - Supports all standard sequence operations

📚 Basic Usage

Simple Example

from collections import deque

# Create a deque
d = deque([1, 2, 3, 4, 5])

# Operations from both ends
d.appendleft(0) # deque([0, 1, 2, 3, 4, 5])
d.append(6) # deque([0, 1, 2, 3, 4, 5, 6])
d.popleft() # 0, deque([1, 2, 3, 4, 5, 6])
d.pop() # 6, deque([1, 2, 3, 4, 5])

# Rotation
d.rotate(2) # deque([4, 5, 1, 2, 3])
d.rotate(-1) # deque([5, 1, 2, 3, 4])

# Fixed-size deque (circular buffer)
circular = deque(maxlen=3)
for i in range(5):
circular.append(i)
print(circular)
# deque([0], maxlen=3)
# deque([0, 1], maxlen=3)
# deque([0, 1, 2], maxlen=3)
# deque([1, 2, 3], maxlen=3) # 0 was evicted
# deque([2, 3, 4], maxlen=3) # 1 was evicted

Core Methods

from collections import deque

# Initialize with iterable and optional maxlen
d = deque([1, 2, 3], maxlen=5)

# Access properties
print(d.maxlen) # 5
print(len(d)) # 3

# Convert to list for display
print(list(d)) # [1, 2, 3]

🔧 Deque API Reference

Methods

MethodDescriptionTime ComplexityReturn TypeExample
__init__(iterable=(), maxlen=None)Initialize deque with optional iterable and maxlenO(n)dequedeque([1, 2, 3], maxlen=5)
append(x)Add element to right endO(1)Noned.append(4)
appendleft(x)Add element to left endO(1)Noned.appendleft(0)
pop()Remove and return element from right endO(1)Anyx = d.pop()
popleft()Remove and return element from left endO(1)Anyx = d.popleft()
extend(iterable)Extend right end with iterableO(k)Noned.extend([5, 6, 7])
extendleft(iterable)Extend left end with iterable (reversed)O(k)Noned.extendleft([3, 2, 1])
rotate(n=1)Rotate deque n steps to the rightO(n)Noned.rotate(2)
reverse()Reverse the deque in placeO(n)Noned.reverse()
clear()Remove all elementsO(n)Noned.clear()
copy()Create shallow copyO(n)dequenew_d = d.copy()
count(x)Count occurrences of xO(n)intd.count(2)
remove(value)Remove first occurrence of valueO(n)Noned.remove(3)
index(x, start=0, stop=None)Find index of first occurrenceO(n)intd.index(2)
insert(i, x)Insert x at position iO(n)Noned.insert(1, 'new')

Properties/Attributes

AttributeDescriptionTypeExample
maxlenMaximum size of deque (None if unlimited)int or Noned.maxlen

Detailed Method Examples

from collections import deque

# Initialize test deque
d = deque([1, 2, 3, 4, 5])

# Basic operations
print(f"Original: {list(d)}") # [1, 2, 3, 4, 5]

# Append operations
d.append(6)
d.appendleft(0)
print(f"After appends: {list(d)}") # [0, 1, 2, 3, 4, 5, 6]

# Pop operations
right = d.pop() # 6
left = d.popleft() # 0
print(f"Popped {left} and {right}") # Popped 0 and 6
print(f"After pops: {list(d)}") # [1, 2, 3, 4, 5]

# Extend operations
d.extend([6, 7, 8])
print(f"After extend: {list(d)}") # [1, 2, 3, 4, 5, 6, 7, 8]

d.extendleft(['a', 'b', 'c'])
print(f"After extendleft: {list(d)}") # ['c', 'b', 'a', 1, 2, 3, 4, 5, 6, 7, 8]

# Rotation
d = deque([1, 2, 3, 4, 5])
d.rotate(2) # Move 2 elements from right to left
print(f"Rotate right 2: {list(d)}") # [4, 5, 1, 2, 3]

d.rotate(-3) # Move 3 elements from left to right
print(f"Rotate left 3: {list(d)}") # [2, 3, 4, 5, 1]

# Search and modify
d = deque([1, 2, 3, 2, 4, 2, 5])
print(f"Count of 2: {d.count(2)}") # 3
print(f"Index of 2: {d.index(2)}") # 1
print(f"Index of 2 after pos 2: {d.index(2, 2)}") # 3

d.remove(2) # Remove first occurrence
print(f"After removing 2: {list(d)}") # [1, 3, 2, 4, 2, 5]

# Insert (O(n) operation)
d.insert(2, 'inserted')
print(f"After insert: {list(d)}") # [1, 3, 'inserted', 2, 4, 2, 5]

# Reverse
d = deque([1, 2, 3, 4, 5])
d.reverse()
print(f"Reversed: {list(d)}") # [5, 4, 3, 2, 1]

# Copy
original = deque([1, 2, 3])
copy_d = original.copy()
copy_d.append(4)
print(f"Original: {list(original)}") # [1, 2, 3]
print(f"Copy: {list(copy_d)}") # [1, 2, 3, 4]

# Clear
d.clear()
print(f"After clear: {list(d)}") # []

# Maxlen behavior
bounded = deque([1, 2, 3], maxlen=3)
bounded.append(4) # Evicts 1
print(f"Bounded deque: {list(bounded)}") # [2, 3, 4]
bounded.appendleft(0) # Evicts 4
print(f"After appendleft: {list(bounded)}") # [0, 2, 3]

Important Notes

  • extendleft() reverses order: When using extendleft([1, 2, 3]), elements are added as 3, 2, 1
  • rotate() direction: Positive values rotate right, negative values rotate left
  • maxlen enforcement: When maxlen is set, adding beyond capacity automatically removes from the opposite end
  • Thread safety: append, appendleft, pop, popleft are atomic operations
  • Index access: Unlike lists, deque doesn't support efficient random access (O(n) for middle elements)

🎯 Primary Use Cases

1. Queue Implementation (FIFO)

Use Case: Producer-consumer patterns, task processing, buffering data streams. Why deque: O(1) append and popleft operations make it perfect for queue operations.

from collections import deque

# Simple queue
queue = deque()
queue.append("task1") # Add to right (enqueue)
queue.append("task2")
item = queue.popleft() # Remove from left (dequeue) - "task1"

# Bounded queue with automatic eviction
bounded_queue = deque(maxlen=3)
for i in range(5):
bounded_queue.append(f"task_{i}")
print(bounded_queue) # deque(['task_2', 'task_3', 'task_4'], maxlen=3)

2. Stack Implementation (LIFO)

Use Case: Expression evaluation, undo/redo operations, function call management. Why deque: O(1) append and pop operations from the same end create perfect stack behavior.

from collections import deque

# Simple stack
stack = deque()
stack.append("item1") # Push
stack.append("item2")
item = stack.pop() # Pop - "item2" (last in, first out)

# Expression evaluation example
def evaluate_postfix(expression):
stack = deque()
for token in expression.split():
if token in "+-*/":
b, a = stack.pop(), stack.pop()
result = eval(f"{a} {token} {b}")
stack.append(result)
else:
stack.append(float(token))
return stack.pop()

result = evaluate_postfix("3 4 + 2 *") # (3 + 4) * 2 = 14

3. Sliding Window Algorithms

Use Case: Moving averages, time-series analysis, algorithm problems requiring fixed-size windows. Why deque: Efficient addition/removal from both ends enables optimal sliding window operations.

from collections import deque

# Moving average with fixed window
class MovingAverage:
def __init__(self, size):
self.size = size
self.queue = deque(maxlen=size)
self.sum = 0

def next(self, val):
if len(self.queue) == self.size:
self.sum -= self.queue[0] # Remove oldest
self.queue.append(val)
self.sum += val
return self.sum / len(self.queue)

# Sliding window maximum
def sliding_window_max(nums, k):
dq = deque() # Store indices
result = []

for i, num in enumerate(nums):
# Remove indices outside window
while dq and dq[0] <= i - k:
dq.popleft()

# Remove smaller elements
while dq and nums[dq[-1]] <= num:
dq.pop()

dq.append(i)

if i >= k - 1:
result.append(nums[dq[0]])

return result

# Usage
ma = MovingAverage(3)
print(ma.next(1)) # 1.0
print(ma.next(10)) # 5.5
print(ma.next(3)) # 4.67
print(ma.next(5)) # 6.0 (avg of 10,3,5)

nums = [1, 3, -1, -3, 5, 3, 6, 7]
result = sliding_window_max(nums, 3) # [3, 3, 5, 5, 6, 7]

4. LRU Cache Implementation

Use Case: Caching with automatic eviction of least recently used items. Why deque: Efficiently tracks access order with O(1) operations for cache management.

from collections import deque

class LRUCache:
def __init__(self, capacity):
self.capacity = capacity
self.cache = {} # key -> value
self.order = deque() # Track access order (LRU -> MRU)

def get(self, key):
if key not in self.cache:
return -1

# Move to end (most recently used)
self.order.remove(key)
self.order.append(key)
return self.cache[key]

def put(self, key, value):
if key in self.cache:
# Update existing
self.cache[key] = value
self.order.remove(key)
self.order.append(key)
else:
# Add new
if len(self.cache) >= self.capacity:
# Evict LRU
lru_key = self.order.popleft()
del self.cache[lru_key]

self.cache[key] = value
self.order.append(key)

# Usage
cache = LRUCache(2)
cache.put(1, "one")
cache.put(2, "two")
print(cache.get(1)) # "one" (moves 1 to MRU)
cache.put(3, "three") # evicts key 2
print(cache.get(2)) # -1 (not found)

5. Breadth-First Search (BFS)

Use Case: Graph traversal, shortest path algorithms, level-order tree traversal. Why deque: FIFO behavior ensures nodes are processed in correct breadth-first order.

from collections import deque

def bfs(graph, start, target=None):
"""Breadth-first search using deque."""
if start not in graph:
return []

visited = set()
queue = deque([start])
path = []

while queue:
vertex = queue.popleft()

if vertex in visited:
continue

visited.add(vertex)
path.append(vertex)

if target and vertex == target:
return path

# Add unvisited neighbors
for neighbor in graph.get(vertex, []):
if neighbor not in visited:
queue.append(neighbor)

return path

def shortest_path(graph, start, target):
"""Find shortest path using BFS."""
queue = deque([(start, [start])])
visited = {start}

while queue:
vertex, path = queue.popleft()

if vertex == target:
return path

for neighbor in graph.get(vertex, []):
if neighbor not in visited:
visited.add(neighbor)
queue.append((neighbor, path + [neighbor]))

return None

# Usage
graph = {
'A': ['B', 'C'],
'B': ['D', 'E'],
'C': ['F'],
'D': ['G'],
'E': ['G'],
'F': ['G'],
'G': []
}

path = bfs(graph, 'A') # ['A', 'B', 'C', 'D', 'E', 'F', 'G']
shortest = shortest_path(graph, 'A', 'G') # ['A', 'B', 'D', 'G']

🚀 Advanced Real-World Applications

1. Circular Log Buffer

Use Case: High-performance logging with automatic rotation and memory management. Why deque: maxlen parameter provides automatic eviction of old entries.

from collections import deque
import time

class CircularLogBuffer:
def __init__(self, maxsize=1000):
self.buffer = deque(maxlen=maxsize)

def add_log(self, level, message):
entry = {
'timestamp': time.time(),
'level': level,
'message': message,
'formatted_time': time.strftime('%Y-%m-%d %H:%M:%S')
}
self.buffer.append(entry) # Auto-evicts oldest when full

def get_recent_logs(self, count=10):
return list(self.buffer)[-count:]

def search_logs(self, keyword):
return [entry for entry in self.buffer
if keyword.lower() in entry['message'].lower()]

# Usage
log_buffer = CircularLogBuffer(maxsize=100)
log_buffer.add_log('INFO', 'Server started')
log_buffer.add_log('ERROR', 'Database connection failed')

recent = log_buffer.get_recent_logs(5)
errors = log_buffer.search_logs('error')

2. Rate Limiting with Token Bucket

Use Case: API rate limiting, request throttling, bandwidth control. Why deque: Efficiently tracks token timestamps and provides automatic capacity management.

from collections import deque
import time

class TokenBucket:
def __init__(self, capacity, refill_rate):
self.capacity = capacity
self.refill_rate = refill_rate # tokens per second
self.tokens = deque(maxlen=capacity)
self.last_refill = time.time()

# Fill initial tokens
current_time = time.time()
for _ in range(capacity):
self.tokens.append(current_time)

def consume(self, tokens=1):
"""Try to consume tokens from bucket."""
self._refill_tokens()

if len(self.tokens) >= tokens:
for _ in range(tokens):
self.tokens.popleft()
return True
return False

def _refill_tokens(self):
"""Refill tokens based on time elapsed."""
current_time = time.time()
time_elapsed = current_time - self.last_refill
tokens_to_add = int(time_elapsed * self.refill_rate)

for _ in range(min(tokens_to_add, self.capacity - len(self.tokens))):
self.tokens.append(current_time)

self.last_refill = current_time

# Usage
rate_limiter = TokenBucket(capacity=10, refill_rate=5) # 5 tokens/sec

def make_api_request(endpoint):
if rate_limiter.consume(1):
print(f"✓ Request to {endpoint} allowed")
return f"Response from {endpoint}"
else:
print(f"✗ Request to {endpoint} rate limited")
return None

# Simulate burst requests
for i in range(15):
make_api_request(f"/api/data/{i}")
time.sleep(0.1)

3. Task Scheduler with Priority Queues

Use Case: Job scheduling, task management systems, event processing. Why deque: Perfect for ready task queues combined with priority scheduling.

from collections import deque
import heapq
import time
from dataclasses import dataclass

@dataclass
class Task:
priority: int
scheduled_time: float
name: str
func: callable

def __lt__(self, other):
if self.scheduled_time == other.scheduled_time:
return self.priority < other.priority
return self.scheduled_time < other.scheduled_time

class TaskScheduler:
def __init__(self):
self.scheduled_tasks = [] # heapq for future tasks
self.ready_queue = deque() # deque for ready tasks
self.completed = deque(maxlen=50) # Recent completions

def schedule_task(self, func, name, delay=0, priority=0):
"""Schedule a task for execution."""
scheduled_time = time.time() + delay
task = Task(priority, scheduled_time, name, func)
heapq.heappush(self.scheduled_tasks, task)

def process_ready_tasks(self):
"""Move ready tasks to execution queue."""
current_time = time.time()

while (self.scheduled_tasks and
self.scheduled_tasks[0].scheduled_time <= current_time):
ready_task = heapq.heappop(self.scheduled_tasks)
self.ready_queue.append(ready_task)

def execute_next_task(self):
"""Execute the next ready task."""
if not self.ready_queue:
return None

task = self.ready_queue.popleft()
start_time = time.time()

try:
result = task.func()
duration = time.time() - start_time

completion = {
'name': task.name,
'duration': duration,
'status': 'completed',
'result': result
}
self.completed.append(completion)
return completion

except Exception as e:
completion = {
'name': task.name,
'duration': time.time() - start_time,
'status': 'failed',
'error': str(e)
}
self.completed.append(completion)
return completion

# Usage
scheduler = TaskScheduler()

def sample_task(name, duration=0.1):
def task():
print(f"Executing {name}")
time.sleep(duration)
return f"Result from {name}"
return task

# Schedule tasks with different priorities and delays
scheduler.schedule_task(sample_task("low_priority"), "low", delay=1, priority=5)
scheduler.schedule_task(sample_task("high_priority"), "high", delay=1, priority=1)
scheduler.schedule_task(sample_task("immediate"), "immediate", delay=0, priority=3)

# Process and execute
while scheduler.scheduled_tasks or scheduler.ready_queue:
scheduler.process_ready_tasks()
result = scheduler.execute_next_task()
if result:
print(f"Task {result['name']}: {result['status']}")
time.sleep(0.1)

📊 Performance Considerations

Time Complexity Comparison

import time
from collections import deque

def benchmark_operations():
"""Compare deque vs list performance."""

# Test data size
n = 100000

# Initialize structures
d = deque()
lst = []

print("=== Append Operations ===")

# Test append (right end)
start = time.time()
for i in range(n):
d.append(i)
deque_append_time = time.time() - start

start = time.time()
for i in range(n):
lst.append(i)
list_append_time = time.time() - start

print(f"Deque append: {deque_append_time:.4f}s")
print(f"List append: {list_append_time:.4f}s")
print(f"Ratio: {deque_append_time/list_append_time:.2f}x")

print("\n=== Appendleft/Insert Operations ===")

# Reset structures
d.clear()
lst.clear()

# Test appendleft vs insert(0)
start = time.time()
for i in range(10000): # Smaller n for list insert
d.appendleft(i)
deque_appendleft_time = time.time() - start

start = time.time()
for i in range(10000):
lst.insert(0, i)
list_insert_time = time.time() - start

print(f"Deque appendleft: {deque_appendleft_time:.4f}s")
print(f"List insert(0): {list_insert_time:.4f}s")
print(f"Ratio: {deque_appendleft_time/list_insert_time:.2f}x")

print("\n=== Pop Operations ===")

# Reset and populate
d = deque(range(n))
lst = list(range(n))

# Test pop (right end)
start = time.time()
while d:
d.pop()
deque_pop_time = time.time() - start

start = time.time()
while lst:
lst.pop()
list_pop_time = time.time() - start

print(f"Deque pop: {deque_pop_time:.4f}s")
print(f"List pop: {list_pop_time:.4f}s")
print(f"Ratio: {deque_pop_time/list_pop_time:.2f}x")

print("\n=== Popleft Operations ===")

# Reset and populate
d = deque(range(10000)) # Smaller for list
lst = list(range(10000))

# Test popleft vs pop(0)
start = time.time()
while d:
d.popleft()
deque_popleft_time = time.time() - start

start = time.time()
while lst:
lst.pop(0)
list_pop0_time = time.time() - start

print(f"Deque popleft: {deque_popleft_time:.4f}s")
print(f"List pop(0): {list_pop0_time:.4f}s")
print(f"Ratio: {deque_popleft_time/list_pop0_time:.2f}x")

benchmark_operations()

Memory Usage Analysis

import sys
from collections import deque

def analyze_memory_usage():
"""Analyze memory usage of deque vs list."""

sizes = [100, 1000, 10000, 100000]

print("=== Memory Usage Analysis ===")
print(f"{'Size':<10} {'Deque (bytes)':<15} {'List (bytes)':<15} {'Difference':<12}")
print("-" * 55)

for size in sizes:
# Create structures
d = deque(range(size))
lst = list(range(size))

# Measure memory
deque_size = sys.getsizeof(d)
list_size = sys.getsizeof(lst)
difference = list_size - deque_size

print(f"{size:<10} {deque_size:<15} {list_size:<15} {difference:<12}")

analyze_memory_usage()

🎯 When to Use Deque

✅ Ideal Use Cases

  • Queue Implementation - FIFO data structures
  • Stack Implementation - LIFO data structures with O(1) operations
  • Sliding Window Algorithms - Fixed-size windows over data streams
  • BFS Algorithms - Graph traversal requiring queue
  • LRU Caches - Tracking access order
  • Circular Buffers - Fixed-size logging or data buffers
  • Undo/Redo Systems - Command pattern implementations
  • Task Scheduling - Ready queues in schedulers

❌ When NOT to Use Deque

  • Random Access Required - Use list for O(1) indexing
  • Sorting Needed - Lists have built-in sort methods
  • Mathematical Operations - NumPy arrays for numerical computation
  • Memory Critical - For simple sequential access, lists might be more efficient
  • Slicing Operations - Lists provide more efficient slicing

🔧 Key Industries and Applications

  • Web Development - Request queues, session management, caching
  • Game Development - Game loops, animation queues, event systems
  • Financial Systems - Order books, transaction queues, real-time data
  • System Administration - Log processing, monitoring, task queues
  • Data Processing - Stream processing, ETL pipelines, batch processing
  • Network Programming - Packet buffers, connection pools, message queues

📖 Additional Learning Resources

Official Python Resources

Algorithm and Data Structure Resources

Video Tutorials

Advanced Topics

💡 Best Practices

  1. Choose Right Operation - Use appendleft/popleft for queue, append/pop for stack
  2. Consider maxlen - Use maxlen for circular buffers and memory control
  3. Thread Safety - Deque operations are atomic, but compound operations need locking
  4. Performance Awareness - Avoid index access in middle of deque (O(n) operation)
  5. Memory Management - Use maxlen to prevent unbounded growth
  6. Clear Intent - Use descriptive variable names (queue, stack, buffer)

🚀 Advanced Patterns

Thread-Safe Deque Operations

Use Case: Multi-threaded applications requiring safe concurrent access to shared queues. Why deque: Basic operations are atomic, but complex operations need additional synchronization.

from collections import deque
import threading
import time

class ThreadSafeDeque:
def __init__(self, maxlen=None):
self._deque = deque(maxlen=maxlen)
self._lock = threading.RLock()
self._not_empty = threading.Condition(self._lock)

def append_wait(self, item, timeout=None):
"""Append with waiting if needed."""
with self._lock:
self._deque.append(item)
self._not_empty.notify()

def popleft_wait(self, timeout=None):
"""Popleft with waiting if empty."""
with self._not_empty:
while not self._deque:
if not self._not_empty.wait(timeout):
raise TimeoutError("Queue empty")

item = self._deque.popleft()
return item

def bulk_append(self, items):
"""Append multiple items atomically."""
with self._lock:
for item in items:
self._deque.append(item)
self._not_empty.notify_all()

def bulk_popleft(self, count):
"""Pop multiple items atomically."""
with self._lock:
items = []
for _ in range(min(count, len(self._deque))):
items.append(self._deque.popleft())
return items

# Usage
safe_queue = ThreadSafeDeque(maxlen=10)

def producer():
for i in range(5):
safe_queue.append_wait(f"item_{i}")
print(f"Produced: item_{i}")
time.sleep(0.1)

def consumer():
for _ in range(5):
try:
item = safe_queue.popleft_wait(timeout=1)
print(f"Consumed: {item}")
except TimeoutError:
print("Timeout waiting for item")

# Run producer and consumer
threading.Thread(target=producer).start()
threading.Thread(target=consumer).start()

Deque is an essential data structure for any Python developer working with queues, stacks, or algorithms requiring efficient operations at both ends. Its versatility and performance make it the go-to choice for many real-world applications involving data flow and buffering.