Python Engineer - Interview Q&A Guide
Basic Questions (5)
1. What are the key differences between Python 2 and Python 3, and what are Python's main features?
Answer: Python 3 key differences:
- Print is a function:
print("hello")
vsprint "hello"
- Unicode by default: Strings are Unicode in Python 3
- Integer division:
/
returns float,//
for integer division - Range returns iterator, not list
Python features: Dynamic typing, interpreted language, object-oriented with multiple inheritance, extensive standard library, cross-platform compatibility, strong community and ecosystem.
2. How do you reverse a string in Python? Show 3 different ways.
Answer:
s = "hello"
# Method 1: Slicing
reversed1 = s[::-1]
# Method 2: Built-in function
reversed2 = ''.join(reversed(s))
# Method 3: Loop
reversed3 = ''.join(s[i] for i in range(len(s)-1, -1, -1))
3. What is the difference between ==
and is
operators? Give a practical example.
Answer:
==
compares values (equality)is
compares object identity (same object in memory)
a = [1, 2, 3]
b = [1, 2, 3]
c = a
print(a == b) # True (same values)
print(a is b) # False (different objects)
print(a is c) # True (same object)
4. How do you find duplicate elements in a list?
Answer:
# Method 1: Using set
nums = [1, 2, 3, 2, 4, 3]
duplicates = list(set([x for x in nums if nums.count(x) > 1]))
# Method 2: Using dictionary
seen = {}
duplicates = [x for x in nums if x in seen or seen.setdefault(x, False)]
5. What are list comprehensions? Write a comprehension to get even squares from 1 to 10.
Answer: List comprehensions provide a concise way to create lists:
# Basic syntax: [expression for item in iterable if condition]
even_squares = [x**2 for x in range(1, 11) if x % 2 == 0]
# Result: [4, 16, 36, 64, 100]
Intermediate Questions (20)
6. How do you remove specific lines from a file based on content?
Answer:
# Remove lines containing "DELETE"
with open('input.txt', 'r') as infile, open('output.txt', 'w') as outfile:
for line in infile:
if "DELETE" not in line:
outfile.write(line)
# In-place modification
lines = open('file.txt').readlines()
open('file.txt', 'w').writelines([l for l in lines if 'DELETE' not in l])
7. Write a function to find the second largest number in a list.
Answer:
def second_largest(numbers):
if len(numbers) < 2:
return None
unique_nums = list(set(numbers))
unique_nums.sort()
return unique_nums[-2] if len(unique_nums) >= 2 else None
# One-liner approach
second_largest = lambda nums: sorted(set(nums))[-2] if len(set(nums)) >= 2 else None
8. How do you count word frequency in a text? Handle case sensitivity.
Answer:
from collections import Counter
import re
def word_frequency(text):
words = re.findall(r'\b\w+\b', text.lower())
return Counter(words)
# Usage
text = "Hello world. Hello Python world!"
freq = word_frequency(text) # {'hello': 2, 'world': 2, 'python': 1}
9. Implement a function to check if a string is a palindrome (ignore spaces and case).
Answer:
def is_palindrome(s):
cleaned = ''.join(c.lower() for c in s if c.isalnum())
return cleaned == cleaned[::-1]
# Alternative using two pointers
def is_palindrome_v2(s):
left, right = 0, len(s) - 1
while left < right:
if not s[left].isalnum(): left += 1
elif not s[right].isalnum(): right -= 1
elif s[left].lower() != s[right].lower(): return False
else: left, right = left + 1, right - 1
return True
10. How do you flatten a nested list?
Answer:
# Recursive approach
def flatten(lst):
result = []
for item in lst:
if isinstance(item, list):
result.extend(flatten(item))
else:
result.append(item)
return result
# Using itertools (for one level)
from itertools import chain
flat = list(chain.from_iterable(nested_list))
11. Write a function to rotate a list by n positions.
Answer:
def rotate_list(lst, n):
if not lst:
return lst
n = n % len(lst) # Handle n > len(lst)
return lst[-n:] + lst[:-n]
# In-place rotation using slicing
def rotate_inplace(lst, n):
n = n % len(lst)
lst[:] = lst[-n:] + lst[:-n]
12. How do you find missing numbers in a sequence from 1 to n?
Answer:
def find_missing(nums, n):
# Method 1: Using set
present = set(nums)
return [i for i in range(1, n+1) if i not in present]
# Method 2: Mathematical approach
def find_missing_math(nums, n):
expected_sum = n * (n + 1) // 2
actual_sum = sum(nums)
return expected_sum - actual_sum # Works for single missing number
13. Implement a simple cache decorator with expiration.
Answer:
import time
from functools import wraps
def cache_with_timeout(timeout_seconds):
def decorator(func):
cache = {}
@wraps(func)
def wrapper(*args):
key = args
if key in cache:
result, timestamp = cache[key]
if time.time() - timestamp < timeout_seconds:
return result
result = func(*args)
cache[key] = (result, time.time())
return result
return wrapper
return decorator
14. How do you merge two sorted lists into one sorted list?
Answer:
def merge_sorted_lists(list1, list2):
merged = []
i = j = 0
while i < len(list1) and j < len(list2):
if list1[i] <= list2[j]:
merged.append(list1[i])
i += 1
else:
merged.append(list2[j])
j += 1
merged.extend(list1[i:])
merged.extend(list2[j:])
return merged
15. Write a function to group anagrams together.
Answer:
from collections import defaultdict
def group_anagrams(words):
groups = defaultdict(list)
for word in words:
key = ''.join(sorted(word.lower()))
groups[key].append(word)
return list(groups.values())
# Usage
words = ["eat", "tea", "tan", "ate", "nat", "bat"]
# Result: [['eat', 'tea', 'ate'], ['tan', 'nat'], ['bat']]
16. How do you find the intersection of two lists while preserving order?
Answer:
def intersection_ordered(list1, list2):
set2 = set(list2)
return [x for x in list1 if x in set2]
# To maintain unique elements
def intersection_unique(list1, list2):
seen = set()
result = []
set2 = set(list2)
for x in list1:
if x in set2 and x not in seen:
result.append(x)
seen.add(x)
return result
17. Implement a function to convert Roman numerals to integers.
Answer:
def roman_to_int(roman):
values = {'I': 1, 'V': 5, 'X': 10, 'L': 50,
'C': 100, 'D': 500, 'M': 1000}
total = 0
prev_value = 0
for char in reversed(roman):
value = values[char]
if value < prev_value:
total -= value
else:
total += value
prev_value = value
return total
18. How do you remove duplicates from a list while preserving order?
Answer:
def remove_duplicates_ordered(lst):
seen = set()
result = []
for item in lst:
if item not in seen:
seen.add(item)
result.append(item)
return result
# Using dict (Python 3.7+)
def remove_duplicates_dict(lst):
return list(dict.fromkeys(lst))
19. Write a function to find the longest common prefix among strings.
Answer:
def longest_common_prefix(strings):
if not strings:
return ""
min_len = min(len(s) for s in strings)
for i in range(min_len):
char = strings[0][i]
if not all(s[i] == char for s in strings):
return strings[0][:i]
return strings[0][:min_len]
20. How do you implement a simple rate limiter?
Answer:
import time
from collections import defaultdict
class RateLimiter:
def __init__(self, max_requests, time_window):
self.max_requests = max_requests
self.time_window = time_window
self.requests = defaultdict(list)
def is_allowed(self, user_id):
now = time.time()
user_requests = self.requests[user_id]
# Remove old requests
user_requests[:] = [req_time for req_time in user_requests
if now - req_time < self.time_window]
if len(user_requests) < self.max_requests:
user_requests.append(now)
return True
return False
21. How do you read a large file efficiently without loading it entirely into memory?
Answer:
def read_large_file_chunks(filename, chunk_size=8192):
with open(filename, 'r') as file:
while True:
chunk = file.read(chunk_size)
if not chunk:
break
yield chunk
# Process line by line
def process_large_file(filename):
with open(filename, 'r') as file:
for line_num, line in enumerate(file, 1):
# Process each line individually
yield line_num, line.strip()
22. Implement a function to validate email addresses using regex.
Answer:
import re
def is_valid_email(email):
pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
return bool(re.match(pattern, email))
def extract_emails_from_text(text):
pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
return re.findall(pattern, text)
# Batch validation
def validate_email_list(emails):
return {email: is_valid_email(email) for email in emails}
23. How do you implement a simple retry mechanism with exponential backoff?
Answer:
import time
import random
from functools import wraps
def retry_with_backoff(max_retries=3, base_delay=1, max_delay=60):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_retries - 1:
raise e
delay = min(base_delay * (2 ** attempt), max_delay)
jitter = random.uniform(0, 0.1) * delay
time.sleep(delay + jitter)
return wrapper
return decorator
24. Write a function to convert a nested dictionary to dot notation.
Answer:
def flatten_dict(nested_dict, parent_key='', sep='.'):
items = []
for key, value in nested_dict.items():
new_key = f"{parent_key}{sep}{key}" if parent_key else key
if isinstance(value, dict):
items.extend(flatten_dict(value, new_key, sep).items())
else:
items.append((new_key, value))
return dict(items)
# Example: {'a': {'b': {'c': 1}}} -> {'a.b.c': 1}
25. How do you implement a simple thread-safe counter?
Answer:
import threading
class ThreadSafeCounter:
def __init__(self, initial_value=0):
self._value = initial_value
self._lock = threading.Lock()
def increment(self, amount=1):
with self._lock:
self._value += amount
return self._value
def get_value(self):
with self._lock:
return self._value
Complex Questions (20)
26. Design a scalable web scraping system with rate limiting and error handling.
Answer: High-level approach:
- Architecture: Use async/await with aiohttp for concurrent requests
- Rate Limiting: Implement token bucket algorithm per domain
- Error Handling: Exponential backoff retry with circuit breaker pattern
- Data Storage: Queue system (Redis/RabbitMQ) for URLs and results
- Monitoring: Track success rates, response times, and error patterns
- Scaling: Distribute across multiple workers with shared state
- Compliance: Respect robots.txt and implement delays per site
Key components: RateLimiter class, RetryHandler, CircuitBreaker, URLQueue, DataProcessor, and WorkerPool manager.
27. Implement a real-time data processing pipeline with Kafka integration.
Answer: Pipeline design:
- Producer: Kafka producer for real-time data ingestion
- Consumer Groups: Multiple consumers for parallel processing
- Stream Processing: Process messages in micro-batches
- Error Handling: Dead letter queues for failed messages
- State Management: Redis/database for processing state
- Monitoring: Track lag, throughput, and error rates
- Scaling: Auto-scale consumers based on queue depth
Key patterns: Consumer-producer pattern, batch processing, checkpoint management, and graceful shutdown handling.
28. Build a microservices architecture with FastAPI including authentication and database integration.
Answer: Microservices design:
- Service Structure: Separate services for auth, user management, business logic
- API Gateway: Central routing and authentication validation
- Database: Service-specific databases with connection pooling
- Authentication: JWT tokens with refresh mechanism
- Inter-service Communication: HTTP/gRPC with circuit breakers
- Service Discovery: Registry pattern for service location
- Deployment: Docker containers with health checks
Core components: FastAPI apps, SQLAlchemy models, Pydantic schemas, dependency injection, and middleware.
29. Create a machine learning pipeline with model training, validation, and deployment.
Answer: ML pipeline architecture:
- Data Pipeline: ETL for feature engineering and validation
- Model Training: Automated training with hyperparameter tuning
- Model Validation: Cross-validation and performance metrics
- Model Registry: Version control for models and metadata
- Deployment: A/B testing framework for model comparison
- Monitoring: Model drift detection and performance tracking
- Feedback Loop: Continuous learning from production data
Implementation approach: Use MLflow for tracking, scikit-learn for models, and FastAPI for serving endpoints.
30. Design a distributed task queue system using Celery and Redis.
Answer: Task queue architecture:
- Message Broker: Redis for task queue and result storage
- Worker Management: Multiple worker processes with auto-scaling
- Task Routing: Route tasks to specific workers based on type
- Monitoring: Task status tracking and performance metrics
- Error Handling: Retry logic with exponential backoff
- Priority Queues: Different priority levels for task execution
- Workflow Management: Task dependencies and chaining
Key features: Task decorators, result callbacks, periodic tasks, and worker health monitoring.
31. Build a real-time chat application using WebSockets and async Python.
Answer: Chat system design:
- WebSocket Management: Connection handling and user authentication
- Message Broadcasting: Efficient message distribution to connected users
- Room Management: User groups and private messaging
- Message Persistence: Store chat history in database
- Presence System: Online/offline status tracking
- Load Balancing: Distribute connections across multiple servers
- Security: Message validation and rate limiting
Technical approach: AsyncIO for concurrency, Redis for pub/sub, and database for persistence.
32. Implement a caching system with Redis for high-performance applications.
Answer: Caching strategy:
- Cache Patterns: Implement cache-aside, write-through, and write-behind
- Eviction Policies: LRU, LFU, and TTL-based expiration
- Cache Warming: Pre-populate cache with frequently accessed data
- Distributed Caching: Consistent hashing for cache distribution
- Cache Invalidation: Smart invalidation based on data dependencies
- Monitoring: Hit rates, memory usage, and performance metrics
- Fallback Strategy: Graceful degradation when cache is unavailable
Implementation details: Decorator patterns, pipeline operations, and connection pooling.
33. Design a data validation and transformation system using Pydantic.
Answer: Validation framework:
- Schema Definition: Comprehensive data models with validation rules
- Custom Validators: Business logic validation functions
- Error Handling: Detailed error reporting and aggregation
- Data Transformation: Automatic type conversion and cleaning
- Batch Processing: Validate large datasets efficiently
- API Integration: Seamless FastAPI integration
- Configuration: Environment-based validation rules
Core concepts: BaseModel classes, validator decorators, and error serialization.
34. Build a monitoring and alerting system for Python applications.
Answer: Monitoring architecture:
- Metrics Collection: System and application metrics gathering
- Health Checks: Service availability and dependency monitoring
- Alert Rules: Threshold-based and anomaly detection alerting
- Notification System: Multi-channel alert delivery (email, Slack, SMS)
- Dashboard: Real-time metrics visualization
- Log Aggregation: Centralized logging with structured data
- Performance Tracking: Response times and error rate monitoring
Technology stack: Prometheus metrics, structured logging, and alert manager integration.
35. Implement a file processing system for large files.
Answer: File processing strategy:
- Streaming Processing: Process files without loading into memory
- Parallel Processing: Multi-threaded/multi-process file handling
- Chunk Management: Optimal chunk sizes for different file types
- Progress Tracking: Real-time processing status updates
- Error Recovery: Resume processing from failure points
- Format Support: Handle CSV, JSON, XML, and binary formats
- Storage Integration: Direct cloud storage processing
Approach: Generator functions, memory mapping, and worker pools for scalability.
36. Design a configuration management system for Python applications.
Answer: Configuration architecture:
- Hierarchical Config: Environment-specific configuration layers
- Dynamic Loading: Runtime configuration updates without restart
- Validation: Configuration schema validation and type checking
- Secret Management: Secure handling of sensitive configuration
- Environment Detection: Automatic environment-based config selection
- Hot Reload: Live configuration updates with change detection
- Audit Trail: Track configuration changes and history
Implementation approach: YAML/JSON config files, environment variable override, and validation schemas.
37. Build a data pipeline with error handling and retry mechanisms.
Answer: Pipeline architecture:
- Task Definition: Modular pipeline components with clear interfaces
- Dependency Management: DAG-based task execution order
- Error Classification: Distinguish between retryable and permanent errors
- Retry Strategies: Exponential backoff with jitter and circuit breakers
- Dead Letter Queues: Handle permanently failed tasks
- Checkpointing: Save pipeline state for recovery
- Monitoring: Pipeline health and performance metrics
Key patterns: State machines, observer pattern, and graceful error handling.
38. Implement a comprehensive logging and audit system.
Answer: Logging framework:
- Structured Logging: JSON-formatted logs with consistent schema
- Log Levels: Appropriate level usage (DEBUG, INFO, WARN, ERROR)
- Context Propagation: Request ID and user context in all logs
- Audit Trail: Immutable audit records for compliance
- Log Aggregation: Centralized log collection and indexing
- Performance: Asynchronous logging to avoid blocking
- Security: Log sanitization and access controls
Technical approach: Custom formatters, correlation IDs, and audit decorators.
39. Design a plugin architecture for extensible Python applications.
Answer: Plugin system design:
- Plugin Interface: Abstract base class defining plugin contract
- Discovery Mechanism: Automatic plugin loading from directories
- Lifecycle Management: Plugin initialization, execution, and cleanup
- Dependency Resolution: Handle plugin dependencies and conflicts
- Configuration: Plugin-specific configuration management
- Security: Plugin sandboxing and permission management
- Hot Loading: Runtime plugin loading and unloading
Architecture patterns: Registry pattern, dependency injection, and event-driven communication.
40. Implement a comprehensive testing framework.
Answer: Testing strategy:
- Test Types: Unit, integration, functional, and performance tests
- Test Data Management: Fixtures, factories, and test databases
- Mocking: External dependencies and service mocking
- Test Organization: Parameterized tests and test suites
- Coverage: Code coverage tracking and reporting
- CI Integration: Automated test execution in pipelines
- Performance Testing: Load testing and benchmark comparisons
Testing tools: pytest, unittest.mock, factory_boy, and coverage.py integration.
41. Build a real-time notification system using WebSockets.
Answer: Notification architecture:
- Connection Management: WebSocket connection lifecycle handling
- User Authentication: Secure WebSocket authentication
- Message Routing: Efficient message delivery to target users
- Persistence: Store notifications for offline users
- Scalability: Load balancing across multiple WebSocket servers
- Real-time Updates: Instant notification delivery
- Fallback: Alternative delivery methods for connection failures
Technical implementation: AsyncIO, Redis pub/sub, and connection pooling.
42. Create a data lake analytics solution using Python.
Answer: Analytics platform:
- Data Ingestion: Multi-format data ingestion pipelines
- Schema Discovery: Automatic schema inference and evolution
- Query Engine: SQL-like querying over raw data files
- Data Partitioning: Efficient data organization for query performance
- Metadata Management: Data catalog and lineage tracking
- Processing Framework: Distributed data processing capabilities
- API Layer: RESTful APIs for data access and analytics
Technology approach: Pandas, Dask for distributed computing, and cloud storage integration.
43. Implement a metadata management solution.
Answer: Metadata framework:
- Data Discovery: Automatic scanning and cataloging of data assets
- Schema Registry: Centralized schema management and versioning
- Lineage Tracking: Data flow and transformation tracking
- Business Glossary: Business term definitions and mapping
- Data Quality: Metadata-driven quality assessment
- Search: Full-text search across metadata repository
- Governance: Data stewardship and approval workflows
Implementation approach: Graph databases for lineage, search indexing, and RESTful APIs.
44. Design a disaster recovery and business continuity solution.
Answer: DR strategy:
- Backup Strategy: Automated backup scheduling and verification
- Data Replication: Real-time or near-real-time data replication
- Failover Automation: Automatic failover with health monitoring
- Recovery Testing: Regular DR testing and validation
- Documentation: Recovery procedures and contact information
- Communication: Incident communication and status updates
- Monitoring: System health and replication lag monitoring
Technical approach: Database replication, infrastructure as code, and automated testing.
45. Implement a sophisticated customer analytics solution.
Answer: Analytics platform:
- Data Integration: Multi-source customer data consolidation
- Customer 360: Unified customer profile and journey mapping
- Behavioral Analysis: User interaction and engagement analysis
- Segmentation: Dynamic customer segmentation based on behavior
- Predictive Analytics: Churn prediction and lifetime value modeling
- Real-time Processing: Live customer state updates
- Visualization: Interactive dashboards and reporting
Technical stack: Pandas for analysis, scikit-learn for ML, and visualization libraries.
Project-Based Real World Questions (5)
46. Build a complete e-commerce recommendation system that handles 1M+ users and real-time updates.
Answer: Project approach:
- Data Collection: User behavior tracking, product catalogs, purchase history
- Feature Engineering: User profiles, item features, interaction matrices
- Model Implementation: Collaborative filtering, content-based, and hybrid models
- Real-time Serving: API endpoints with sub-100ms response times
- A/B Testing: Recommendation algorithm comparison framework
- Scalability: Distributed computing for model training and serving
- Performance Monitoring: Track recommendation quality and business metrics
- Data Pipeline: ETL for feature updates and model retraining
47. Create a financial fraud detection system processing millions of transactions daily.
Answer: Fraud detection system:
- Real-time Processing: Stream processing for transaction monitoring
- Feature Engineering: Transaction patterns, user behavior, merchant analysis
- ML Models: Ensemble methods, anomaly detection, and deep learning
- Rule Engine: Business rules combined with ML predictions
- Alert Management: Risk scoring and investigation workflow
- Performance: Low latency prediction (<50ms per transaction)
- Compliance: Audit trails and regulatory reporting
- Feedback Loop: Continuous model improvement from investigations
48. Design a log analysis and monitoring platform for microservices architecture.
Answer: Monitoring platform:
- Log Ingestion: Multi-source log collection and parsing
- Real-time Analytics: Stream processing for immediate insights
- Alerting System: Intelligent alerting with noise reduction
- Dashboard: Real-time system health and performance metrics
- Anomaly Detection: Automated detection of unusual patterns
- Root Cause Analysis: Correlation analysis across services
- Scalability: Handle TB+ of logs daily
- Integration: API integrations with existing monitoring tools
49. Build a content management and delivery system for media streaming.
Answer: Media platform:
- Content Pipeline: Video processing, transcoding, and optimization
- CDN Integration: Global content distribution and caching
- User Management: Authentication, subscriptions, and preferences
- Recommendation Engine: Personalized content recommendations
- Analytics: Viewing patterns, engagement metrics, and performance
- Scalability: Handle millions of concurrent streams
- Quality Adaptation: Adaptive bitrate streaming
- Content Protection: DRM and piracy prevention
50. Create a data migration platform for legacy system modernization.
Answer: Migration platform:
- Assessment: Legacy system analysis and mapping
- Data Mapping: Source-to-target schema transformation
- ETL Pipeline: Incremental and full data migration
- Validation Framework: Data integrity and completeness verification
- Rollback Strategy: Safe migration with rollback capabilities
- Performance: Minimize downtime during migration
- Monitoring: Real-time migration progress and issue tracking
- Documentation: Complete migration documentation and procedures
Optimization Questions (5)
51. Your Python application processes 100K records but takes 30 minutes. How do you optimize it?
Answer: Optimization strategy:
- Profiling: Use cProfile and memory_profiler to identify bottlenecks
- Vectorization: Replace loops with pandas/numpy operations
- Parallel Processing: Use multiprocessing for CPU-bound tasks
- Database Optimization: Bulk operations instead of row-by-row processing
- Memory Management: Generator expressions instead of lists
- Caching: Cache expensive computations and database queries
- Algorithm Improvement: Use more efficient algorithms and data structures
# Before: Slow loop
results = [expensive_function(item) for item in large_list]
# After: Parallel processing
from multiprocessing import Pool
with Pool() as pool:
results = pool.map(expensive_function, large_list)
52. How do you optimize memory usage for processing large datasets in Python?
Answer: Memory optimization:
- Generators: Use generators instead of storing everything in memory
- Chunking: Process data in smaller chunks
- Data Types: Use appropriate data types (int32 vs int64)
- Memory Mapping: Use mmap for large file processing
- Garbage Collection: Manual garbage collection for large objects
- Streaming: Process data as it comes instead of loading all
# Memory-efficient file processing
def process_large_file(filename):
with open(filename, 'r') as file:
for line in file: # Generator, not loading all lines
yield process_line(line)
# Chunked processing
def process_dataframe_chunks(df, chunk_size=10000):
for i in range(0, len(df), chunk_size):
chunk = df[i:i + chunk_size]
yield process_chunk(chunk)
53. Optimize a Python web API that has slow response times under load.
Answer: API optimization:
- Async Programming: Use FastAPI/aiohttp for concurrent request handling
- Database Connection Pooling: Reuse database connections
- Caching: Implement Redis caching for frequently accessed data
- Query Optimization: Optimize database queries and use indexes
- Response Compression: Enable gzip compression
- Load Balancing: Distribute load across multiple instances
- Monitoring: Track response times and bottlenecks
# Async endpoint with caching
@app.get("/users/{user_id}")
async def get_user(user_id: int, redis: Redis = Depends(get_redis)):
cache_key = f"user:{user_id}"
cached_user = await redis.get(cache_key)
if cached_user:
return json.loads(cached_user)
user = await database.fetch_user(user_id)
await redis.setex(cache_key, 300, json.dumps(user))
return user
54. How do you optimize Python code for CPU-intensive mathematical calculations?
Answer: Mathematical optimization:
- NumPy: Use vectorized operations instead of Python loops
- Numba: JIT compilation for numeric functions
- Cython: Compile Python to C for performance gains
- Parallel Computing: Use multiprocessing for independent calculations
- Algorithm Choice: Choose efficient algorithms (O(n) vs O(n²))
- Memory Layout: Optimize data structures for cache efficiency
import numpy as np
from numba import jit
# Optimized with NumPy and Numba
@jit(nopython=True)
def fast_calculation(data):
return np.sum(data ** 2) / len(data)
# Vectorized operations
result = np.mean(large_array ** 2) # Much faster than loops
55. Optimize a data pipeline that processes millions of records daily.
Answer: Pipeline optimization:
- Batch Processing: Process records in optimal batch sizes
- Parallel Workers: Use multiple processes/threads for parallel processing
- Database Bulk Operations: Use bulk inserts/updates instead of individual operations
- Memory Management: Stream processing to avoid memory issues
- Error Handling: Efficient error handling without stopping entire pipeline
- Monitoring: Track processing rates and identify bottlenecks
- Resource Allocation: Optimize CPU and memory usage
# Optimized batch processing
def process_records_batch(records, batch_size=1000):
for i in range(0, len(records), batch_size):
batch = records[i:i + batch_size]
processed_batch = [transform_record(r) for r in batch]
database.bulk_insert(processed_batch) # Bulk operation
# Parallel processing
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor(max_workers=4) as executor:
results = executor.map(process_batch, record_batches)
Cost Saving and Best Practices Questions (5)
56. What are the key strategies for writing maintainable and scalable Python code?
Answer: Best practices:
- Code Organization: Use proper module structure and separation of concerns
- Type Hints: Add type annotations for better code documentation
- Documentation: Write clear docstrings and maintain README files
- Testing: Comprehensive unit tests and integration tests
- Error Handling: Proper exception handling and logging
- Code Reviews: Implement peer review processes
- Linting: Use tools like pylint, black, and isort for code quality
from typing import List, Optional
def process_user_data(
users: List[dict],
filter_active: bool = True
) -> Optional[List[dict]]:
"""
Process user data with optional filtering.
Args:
users: List of user dictionaries
filter_active: Whether to filter only active users
Returns:
Processed user data or None if input is invalid
"""
if not users:
return None
if filter_active:
users = [u for u in users if u.get('active', False)]
return [transform_user(user) for user in users]
57. How do you implement efficient error handling and logging in Python applications?
Answer: Error handling best practices:
- Specific Exceptions: Catch specific exceptions, not generic Exception
- Logging Strategy: Use appropriate log levels and structured logging
- Error Context: Include relevant context in error messages
- Graceful Degradation: Handle errors without crashing the application
- Monitoring: Implement error tracking and alerting
- Documentation: Document expected exceptions and handling
import logging
from typing import Optional
logger = logging.getLogger(__name__)
def safe_api_call(url: str) -> Optional[dict]:
try:
response = requests.get(url, timeout=30)
response.raise_for_status()
return response.json()
except requests.exceptions.Timeout:
logger.warning(f"API call timeout for URL: {url}")
return None
except requests.exceptions.HTTPError as e:
logger.error(f"HTTP error {e.response.status_code} for URL: {url}")
return None
except ValueError as e:
logger.error(f"Invalid JSON response from {url}: {e}")
return None
58. What are the security best practices for Python applications?
Answer: Security practices:
- Input Validation: Validate and sanitize all user inputs
- SQL Injection Prevention: Use parameterized queries
- Secret Management: Never hardcode secrets, use environment variables
- Authentication: Implement proper authentication and authorization
- Dependencies: Keep dependencies updated and scan for vulnerabilities
- Logging Security: Don't log sensitive information
- HTTPS: Use secure communication protocols
import os
from cryptography.fernet import Fernet
# Secure configuration
class Config:
SECRET_KEY = os.getenv('SECRET_KEY')
DATABASE_URL = os.getenv('DATABASE_URL')
@classmethod
def validate_config(cls):
if not cls.SECRET_KEY:
raise ValueError("SECRET_KEY environment variable not set")
# Secure database query
def get_user_by_id(user_id: int):
query = "SELECT * FROM users WHERE id = %s"
return database.execute(query, (user_id,)) # Parameterized query
59. How do you optimize Python application performance while minimizing infrastructure costs?
Answer: Cost optimization strategies:
- Efficient Algorithms: Choose algorithms with better time complexity
- Resource Monitoring: Track CPU and memory usage patterns
- Caching: Implement caching to reduce database and API calls
- Async Programming: Use async/await for I/O-bound operations
- Database Optimization: Optimize queries and use connection pooling
- Auto-scaling: Scale resources based on demand
- Profiling: Regular performance profiling and optimization
# Cost-effective caching decorator
from functools import lru_cache
import redis
redis_client = redis.Redis()
def cached_database_call(expiry=300):
def decorator(func):
def wrapper(*args, **kwargs):
cache_key = f"{func.__name__}:{hash(str(args) + str(kwargs))}"
# Try cache first
cached_result = redis_client.get(cache_key)
if cached_result:
return json.loads(cached_result)
# Call function and cache result
result = func(*args, **kwargs)
redis_client.setex(cache_key, expiry, json.dumps(result))
return result
return wrapper
return decorator
60. What are the key considerations for deploying Python applications in production?
Answer: Production deployment:
- Environment Management: Use virtual environments and dependency management
- Configuration: Environment-based configuration management
- Process Management: Use process managers like Gunicorn or uWSGI
- Monitoring: Application performance and health monitoring
- Logging: Centralized logging and log rotation
- Security: Secure deployment practices and access controls
- Backup Strategy: Regular backups and disaster recovery plans
# Production configuration
import os
from dataclasses import dataclass
@dataclass
class ProductionConfig:
debug: bool = False
workers: int = int(os.getenv('WORKERS', '4'))
log_level: str = os.getenv('LOG_LEVEL', 'INFO')
database_url: str = os.getenv('DATABASE_URL')
redis_url: str = os.getenv('REDIS_URL')
def __post_init__(self):
if not self.database_url:
raise ValueError("DATABASE_URL must be set in production")
# Health check endpoint
@app.get("/health")
async def health_check():
try:
# Check database connectivity
await database.fetch_one("SELECT 1")
return {"status": "healthy", "timestamp": time.time()}
except Exception as e:
return {"status": "unhealthy", "error": str(e)}
Additional Scenario-Based Questions
Quick Coding Scenarios (Interview Favorites)
1. Remove duplicates from a string:
def remove_duplicates(s):
return ''.join(dict.fromkeys(s))
2. Find first non-repeating character:
def first_non_repeat(s):
char_count = {}
for char in s:
char_count[char] = char_count.get(char, 0) + 1
for char in s:
if char_count[char] == 1:
return char
return None
3. Check if two strings are anagrams:
def are_anagrams(s1, s2):
return sorted(s1.lower()) == sorted(s2.lower())
4. Find maximum subarray sum (Kadane's algorithm):
def max_subarray_sum(arr):
max_sum = current_sum = arr[0]
for num in arr[1:]:
current_sum = max(num, current_sum + num)
max_sum = max(max_sum, current_sum)
return max_sum
5. Implement LRU cache:
from collections import OrderedDict
class LRUCache:
def __init__(self, capacity):
self.capacity = capacity
self.cache = OrderedDict()
def get(self, key):
if key in self.cache:
self.cache.move_to_end(key)
return self.cache[key]
return -1
def put(self, key, value):
if key in self.cache:
self.cache.move_to_end(key)
self.cache[key] = value
if len(self.cache) > self.capacity:
self.cache.popitem(last=False)
This comprehensive guide covers practical Python scenarios that are commonly asked in interviews, focusing on real-world problems with concise, implementable solutions.
No comments:
Post a Comment