Python Engineer - Interview Q&A Guide

 

Python Engineer - Interview Q&A Guide

Basic Questions (5)

1. What are the key differences between Python 2 and Python 3, and what are Python's main features?

Answer: Python 3 key differences:

  • Print is a function: print("hello") vs print "hello"
  • Unicode by default: Strings are Unicode in Python 3
  • Integer division: / returns float, // for integer division
  • Range returns iterator, not list

Python features: Dynamic typing, interpreted language, object-oriented with multiple inheritance, extensive standard library, cross-platform compatibility, strong community and ecosystem.

2. How do you reverse a string in Python? Show 3 different ways.

Answer:

s = "hello"
# Method 1: Slicing
reversed1 = s[::-1]
# Method 2: Built-in function
reversed2 = ''.join(reversed(s))
# Method 3: Loop
reversed3 = ''.join(s[i] for i in range(len(s)-1, -1, -1))

3. What is the difference between == and is operators? Give a practical example.

Answer:

  • == compares values (equality)
  • is compares object identity (same object in memory)
a = [1, 2, 3]
b = [1, 2, 3]
c = a
print(a == b)  # True (same values)
print(a is b)  # False (different objects)
print(a is c)  # True (same object)

4. How do you find duplicate elements in a list?

Answer:

# Method 1: Using set
nums = [1, 2, 3, 2, 4, 3]
duplicates = list(set([x for x in nums if nums.count(x) > 1]))

# Method 2: Using dictionary
seen = {}
duplicates = [x for x in nums if x in seen or seen.setdefault(x, False)]

5. What are list comprehensions? Write a comprehension to get even squares from 1 to 10.

Answer: List comprehensions provide a concise way to create lists:

# Basic syntax: [expression for item in iterable if condition]
even_squares = [x**2 for x in range(1, 11) if x % 2 == 0]
# Result: [4, 16, 36, 64, 100]

Intermediate Questions (20)

6. How do you remove specific lines from a file based on content?

Answer:

# Remove lines containing "DELETE"
with open('input.txt', 'r') as infile, open('output.txt', 'w') as outfile:
    for line in infile:
        if "DELETE" not in line:
            outfile.write(line)

# In-place modification
lines = open('file.txt').readlines()
open('file.txt', 'w').writelines([l for l in lines if 'DELETE' not in l])

7. Write a function to find the second largest number in a list.

Answer:

def second_largest(numbers):
    if len(numbers) < 2:
        return None
    unique_nums = list(set(numbers))
    unique_nums.sort()
    return unique_nums[-2] if len(unique_nums) >= 2 else None

# One-liner approach
second_largest = lambda nums: sorted(set(nums))[-2] if len(set(nums)) >= 2 else None

8. How do you count word frequency in a text? Handle case sensitivity.

Answer:

from collections import Counter
import re

def word_frequency(text):
    words = re.findall(r'\b\w+\b', text.lower())
    return Counter(words)

# Usage
text = "Hello world. Hello Python world!"
freq = word_frequency(text)  # {'hello': 2, 'world': 2, 'python': 1}

9. Implement a function to check if a string is a palindrome (ignore spaces and case).

Answer:

def is_palindrome(s):
    cleaned = ''.join(c.lower() for c in s if c.isalnum())
    return cleaned == cleaned[::-1]

# Alternative using two pointers
def is_palindrome_v2(s):
    left, right = 0, len(s) - 1
    while left < right:
        if not s[left].isalnum(): left += 1
        elif not s[right].isalnum(): right -= 1
        elif s[left].lower() != s[right].lower(): return False
        else: left, right = left + 1, right - 1
    return True

10. How do you flatten a nested list?

Answer:

# Recursive approach
def flatten(lst):
    result = []
    for item in lst:
        if isinstance(item, list):
            result.extend(flatten(item))
        else:
            result.append(item)
    return result

# Using itertools (for one level)
from itertools import chain
flat = list(chain.from_iterable(nested_list))

11. Write a function to rotate a list by n positions.

Answer:

def rotate_list(lst, n):
    if not lst:
        return lst
    n = n % len(lst)  # Handle n > len(lst)
    return lst[-n:] + lst[:-n]

# In-place rotation using slicing
def rotate_inplace(lst, n):
    n = n % len(lst)
    lst[:] = lst[-n:] + lst[:-n]

12. How do you find missing numbers in a sequence from 1 to n?

Answer:

def find_missing(nums, n):
    # Method 1: Using set
    present = set(nums)
    return [i for i in range(1, n+1) if i not in present]

# Method 2: Mathematical approach
def find_missing_math(nums, n):
    expected_sum = n * (n + 1) // 2
    actual_sum = sum(nums)
    return expected_sum - actual_sum  # Works for single missing number

13. Implement a simple cache decorator with expiration.

Answer:

import time
from functools import wraps

def cache_with_timeout(timeout_seconds):
    def decorator(func):
        cache = {}
        @wraps(func)
        def wrapper(*args):
            key = args
            if key in cache:
                result, timestamp = cache[key]
                if time.time() - timestamp < timeout_seconds:
                    return result
            result = func(*args)
            cache[key] = (result, time.time())
            return result
        return wrapper
    return decorator

14. How do you merge two sorted lists into one sorted list?

Answer:

def merge_sorted_lists(list1, list2):
    merged = []
    i = j = 0
    
    while i < len(list1) and j < len(list2):
        if list1[i] <= list2[j]:
            merged.append(list1[i])
            i += 1
        else:
            merged.append(list2[j])
            j += 1
    
    merged.extend(list1[i:])
    merged.extend(list2[j:])
    return merged

15. Write a function to group anagrams together.

Answer:

from collections import defaultdict

def group_anagrams(words):
    groups = defaultdict(list)
    for word in words:
        key = ''.join(sorted(word.lower()))
        groups[key].append(word)
    return list(groups.values())

# Usage
words = ["eat", "tea", "tan", "ate", "nat", "bat"]
# Result: [['eat', 'tea', 'ate'], ['tan', 'nat'], ['bat']]

16. How do you find the intersection of two lists while preserving order?

Answer:

def intersection_ordered(list1, list2):
    set2 = set(list2)
    return [x for x in list1 if x in set2]

# To maintain unique elements
def intersection_unique(list1, list2):
    seen = set()
    result = []
    set2 = set(list2)
    for x in list1:
        if x in set2 and x not in seen:
            result.append(x)
            seen.add(x)
    return result

17. Implement a function to convert Roman numerals to integers.

Answer:

def roman_to_int(roman):
    values = {'I': 1, 'V': 5, 'X': 10, 'L': 50, 
              'C': 100, 'D': 500, 'M': 1000}
    total = 0
    prev_value = 0
    
    for char in reversed(roman):
        value = values[char]
        if value < prev_value:
            total -= value
        else:
            total += value
        prev_value = value
    return total

18. How do you remove duplicates from a list while preserving order?

Answer:

def remove_duplicates_ordered(lst):
    seen = set()
    result = []
    for item in lst:
        if item not in seen:
            seen.add(item)
            result.append(item)
    return result

# Using dict (Python 3.7+)
def remove_duplicates_dict(lst):
    return list(dict.fromkeys(lst))

19. Write a function to find the longest common prefix among strings.

Answer:

def longest_common_prefix(strings):
    if not strings:
        return ""
    
    min_len = min(len(s) for s in strings)
    for i in range(min_len):
        char = strings[0][i]
        if not all(s[i] == char for s in strings):
            return strings[0][:i]
    return strings[0][:min_len]

20. How do you implement a simple rate limiter?

Answer:

import time
from collections import defaultdict

class RateLimiter:
    def __init__(self, max_requests, time_window):
        self.max_requests = max_requests
        self.time_window = time_window
        self.requests = defaultdict(list)
    
    def is_allowed(self, user_id):
        now = time.time()
        user_requests = self.requests[user_id]
        
        # Remove old requests
        user_requests[:] = [req_time for req_time in user_requests 
                           if now - req_time < self.time_window]
        
        if len(user_requests) < self.max_requests:
            user_requests.append(now)
            return True
        return False

21. How do you read a large file efficiently without loading it entirely into memory?

Answer:

def read_large_file_chunks(filename, chunk_size=8192):
    with open(filename, 'r') as file:
        while True:
            chunk = file.read(chunk_size)
            if not chunk:
                break
            yield chunk

# Process line by line
def process_large_file(filename):
    with open(filename, 'r') as file:
        for line_num, line in enumerate(file, 1):
            # Process each line individually
            yield line_num, line.strip()

22. Implement a function to validate email addresses using regex.

Answer:

import re

def is_valid_email(email):
    pattern = r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
    return bool(re.match(pattern, email))

def extract_emails_from_text(text):
    pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
    return re.findall(pattern, text)

# Batch validation
def validate_email_list(emails):
    return {email: is_valid_email(email) for email in emails}

23. How do you implement a simple retry mechanism with exponential backoff?

Answer:

import time
import random
from functools import wraps

def retry_with_backoff(max_retries=3, base_delay=1, max_delay=60):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_retries):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_retries - 1:
                        raise e
                    
                    delay = min(base_delay * (2 ** attempt), max_delay)
                    jitter = random.uniform(0, 0.1) * delay
                    time.sleep(delay + jitter)
            
        return wrapper
    return decorator

24. Write a function to convert a nested dictionary to dot notation.

Answer:

def flatten_dict(nested_dict, parent_key='', sep='.'):
    items = []
    for key, value in nested_dict.items():
        new_key = f"{parent_key}{sep}{key}" if parent_key else key
        
        if isinstance(value, dict):
            items.extend(flatten_dict(value, new_key, sep).items())
        else:
            items.append((new_key, value))
    
    return dict(items)

# Example: {'a': {'b': {'c': 1}}} -> {'a.b.c': 1}

25. How do you implement a simple thread-safe counter?

Answer:

import threading

class ThreadSafeCounter:
    def __init__(self, initial_value=0):
        self._value = initial_value
        self._lock = threading.Lock()
    
    def increment(self, amount=1):
        with self._lock:
            self._value += amount
            return self._value
    
    def get_value(self):
        with self._lock:
            return self._value

Complex Questions (20)

26. Design a scalable web scraping system with rate limiting and error handling.

Answer: High-level approach:

  1. Architecture: Use async/await with aiohttp for concurrent requests
  2. Rate Limiting: Implement token bucket algorithm per domain
  3. Error Handling: Exponential backoff retry with circuit breaker pattern
  4. Data Storage: Queue system (Redis/RabbitMQ) for URLs and results
  5. Monitoring: Track success rates, response times, and error patterns
  6. Scaling: Distribute across multiple workers with shared state
  7. Compliance: Respect robots.txt and implement delays per site

Key components: RateLimiter class, RetryHandler, CircuitBreaker, URLQueue, DataProcessor, and WorkerPool manager.

27. Implement a real-time data processing pipeline with Kafka integration.

Answer: Pipeline design:

  1. Producer: Kafka producer for real-time data ingestion
  2. Consumer Groups: Multiple consumers for parallel processing
  3. Stream Processing: Process messages in micro-batches
  4. Error Handling: Dead letter queues for failed messages
  5. State Management: Redis/database for processing state
  6. Monitoring: Track lag, throughput, and error rates
  7. Scaling: Auto-scale consumers based on queue depth

Key patterns: Consumer-producer pattern, batch processing, checkpoint management, and graceful shutdown handling.

28. Build a microservices architecture with FastAPI including authentication and database integration.

Answer: Microservices design:

  1. Service Structure: Separate services for auth, user management, business logic
  2. API Gateway: Central routing and authentication validation
  3. Database: Service-specific databases with connection pooling
  4. Authentication: JWT tokens with refresh mechanism
  5. Inter-service Communication: HTTP/gRPC with circuit breakers
  6. Service Discovery: Registry pattern for service location
  7. Deployment: Docker containers with health checks

Core components: FastAPI apps, SQLAlchemy models, Pydantic schemas, dependency injection, and middleware.

29. Create a machine learning pipeline with model training, validation, and deployment.

Answer: ML pipeline architecture:

  1. Data Pipeline: ETL for feature engineering and validation
  2. Model Training: Automated training with hyperparameter tuning
  3. Model Validation: Cross-validation and performance metrics
  4. Model Registry: Version control for models and metadata
  5. Deployment: A/B testing framework for model comparison
  6. Monitoring: Model drift detection and performance tracking
  7. Feedback Loop: Continuous learning from production data

Implementation approach: Use MLflow for tracking, scikit-learn for models, and FastAPI for serving endpoints.

30. Design a distributed task queue system using Celery and Redis.

Answer: Task queue architecture:

  1. Message Broker: Redis for task queue and result storage
  2. Worker Management: Multiple worker processes with auto-scaling
  3. Task Routing: Route tasks to specific workers based on type
  4. Monitoring: Task status tracking and performance metrics
  5. Error Handling: Retry logic with exponential backoff
  6. Priority Queues: Different priority levels for task execution
  7. Workflow Management: Task dependencies and chaining

Key features: Task decorators, result callbacks, periodic tasks, and worker health monitoring.

31. Build a real-time chat application using WebSockets and async Python.

Answer: Chat system design:

  1. WebSocket Management: Connection handling and user authentication
  2. Message Broadcasting: Efficient message distribution to connected users
  3. Room Management: User groups and private messaging
  4. Message Persistence: Store chat history in database
  5. Presence System: Online/offline status tracking
  6. Load Balancing: Distribute connections across multiple servers
  7. Security: Message validation and rate limiting

Technical approach: AsyncIO for concurrency, Redis for pub/sub, and database for persistence.

32. Implement a caching system with Redis for high-performance applications.

Answer: Caching strategy:

  1. Cache Patterns: Implement cache-aside, write-through, and write-behind
  2. Eviction Policies: LRU, LFU, and TTL-based expiration
  3. Cache Warming: Pre-populate cache with frequently accessed data
  4. Distributed Caching: Consistent hashing for cache distribution
  5. Cache Invalidation: Smart invalidation based on data dependencies
  6. Monitoring: Hit rates, memory usage, and performance metrics
  7. Fallback Strategy: Graceful degradation when cache is unavailable

Implementation details: Decorator patterns, pipeline operations, and connection pooling.

33. Design a data validation and transformation system using Pydantic.

Answer: Validation framework:

  1. Schema Definition: Comprehensive data models with validation rules
  2. Custom Validators: Business logic validation functions
  3. Error Handling: Detailed error reporting and aggregation
  4. Data Transformation: Automatic type conversion and cleaning
  5. Batch Processing: Validate large datasets efficiently
  6. API Integration: Seamless FastAPI integration
  7. Configuration: Environment-based validation rules

Core concepts: BaseModel classes, validator decorators, and error serialization.

34. Build a monitoring and alerting system for Python applications.

Answer: Monitoring architecture:

  1. Metrics Collection: System and application metrics gathering
  2. Health Checks: Service availability and dependency monitoring
  3. Alert Rules: Threshold-based and anomaly detection alerting
  4. Notification System: Multi-channel alert delivery (email, Slack, SMS)
  5. Dashboard: Real-time metrics visualization
  6. Log Aggregation: Centralized logging with structured data
  7. Performance Tracking: Response times and error rate monitoring

Technology stack: Prometheus metrics, structured logging, and alert manager integration.

35. Implement a file processing system for large files.

Answer: File processing strategy:

  1. Streaming Processing: Process files without loading into memory
  2. Parallel Processing: Multi-threaded/multi-process file handling
  3. Chunk Management: Optimal chunk sizes for different file types
  4. Progress Tracking: Real-time processing status updates
  5. Error Recovery: Resume processing from failure points
  6. Format Support: Handle CSV, JSON, XML, and binary formats
  7. Storage Integration: Direct cloud storage processing

Approach: Generator functions, memory mapping, and worker pools for scalability.

36. Design a configuration management system for Python applications.

Answer: Configuration architecture:

  1. Hierarchical Config: Environment-specific configuration layers
  2. Dynamic Loading: Runtime configuration updates without restart
  3. Validation: Configuration schema validation and type checking
  4. Secret Management: Secure handling of sensitive configuration
  5. Environment Detection: Automatic environment-based config selection
  6. Hot Reload: Live configuration updates with change detection
  7. Audit Trail: Track configuration changes and history

Implementation approach: YAML/JSON config files, environment variable override, and validation schemas.

37. Build a data pipeline with error handling and retry mechanisms.

Answer: Pipeline architecture:

  1. Task Definition: Modular pipeline components with clear interfaces
  2. Dependency Management: DAG-based task execution order
  3. Error Classification: Distinguish between retryable and permanent errors
  4. Retry Strategies: Exponential backoff with jitter and circuit breakers
  5. Dead Letter Queues: Handle permanently failed tasks
  6. Checkpointing: Save pipeline state for recovery
  7. Monitoring: Pipeline health and performance metrics

Key patterns: State machines, observer pattern, and graceful error handling.

38. Implement a comprehensive logging and audit system.

Answer: Logging framework:

  1. Structured Logging: JSON-formatted logs with consistent schema
  2. Log Levels: Appropriate level usage (DEBUG, INFO, WARN, ERROR)
  3. Context Propagation: Request ID and user context in all logs
  4. Audit Trail: Immutable audit records for compliance
  5. Log Aggregation: Centralized log collection and indexing
  6. Performance: Asynchronous logging to avoid blocking
  7. Security: Log sanitization and access controls

Technical approach: Custom formatters, correlation IDs, and audit decorators.

39. Design a plugin architecture for extensible Python applications.

Answer: Plugin system design:

  1. Plugin Interface: Abstract base class defining plugin contract
  2. Discovery Mechanism: Automatic plugin loading from directories
  3. Lifecycle Management: Plugin initialization, execution, and cleanup
  4. Dependency Resolution: Handle plugin dependencies and conflicts
  5. Configuration: Plugin-specific configuration management
  6. Security: Plugin sandboxing and permission management
  7. Hot Loading: Runtime plugin loading and unloading

Architecture patterns: Registry pattern, dependency injection, and event-driven communication.

40. Implement a comprehensive testing framework.

Answer: Testing strategy:

  1. Test Types: Unit, integration, functional, and performance tests
  2. Test Data Management: Fixtures, factories, and test databases
  3. Mocking: External dependencies and service mocking
  4. Test Organization: Parameterized tests and test suites
  5. Coverage: Code coverage tracking and reporting
  6. CI Integration: Automated test execution in pipelines
  7. Performance Testing: Load testing and benchmark comparisons

Testing tools: pytest, unittest.mock, factory_boy, and coverage.py integration.

41. Build a real-time notification system using WebSockets.

Answer: Notification architecture:

  1. Connection Management: WebSocket connection lifecycle handling
  2. User Authentication: Secure WebSocket authentication
  3. Message Routing: Efficient message delivery to target users
  4. Persistence: Store notifications for offline users
  5. Scalability: Load balancing across multiple WebSocket servers
  6. Real-time Updates: Instant notification delivery
  7. Fallback: Alternative delivery methods for connection failures

Technical implementation: AsyncIO, Redis pub/sub, and connection pooling.

42. Create a data lake analytics solution using Python.

Answer: Analytics platform:

  1. Data Ingestion: Multi-format data ingestion pipelines
  2. Schema Discovery: Automatic schema inference and evolution
  3. Query Engine: SQL-like querying over raw data files
  4. Data Partitioning: Efficient data organization for query performance
  5. Metadata Management: Data catalog and lineage tracking
  6. Processing Framework: Distributed data processing capabilities
  7. API Layer: RESTful APIs for data access and analytics

Technology approach: Pandas, Dask for distributed computing, and cloud storage integration.

43. Implement a metadata management solution.

Answer: Metadata framework:

  1. Data Discovery: Automatic scanning and cataloging of data assets
  2. Schema Registry: Centralized schema management and versioning
  3. Lineage Tracking: Data flow and transformation tracking
  4. Business Glossary: Business term definitions and mapping
  5. Data Quality: Metadata-driven quality assessment
  6. Search: Full-text search across metadata repository
  7. Governance: Data stewardship and approval workflows

Implementation approach: Graph databases for lineage, search indexing, and RESTful APIs.

44. Design a disaster recovery and business continuity solution.

Answer: DR strategy:

  1. Backup Strategy: Automated backup scheduling and verification
  2. Data Replication: Real-time or near-real-time data replication
  3. Failover Automation: Automatic failover with health monitoring
  4. Recovery Testing: Regular DR testing and validation
  5. Documentation: Recovery procedures and contact information
  6. Communication: Incident communication and status updates
  7. Monitoring: System health and replication lag monitoring

Technical approach: Database replication, infrastructure as code, and automated testing.

45. Implement a sophisticated customer analytics solution.

Answer: Analytics platform:

  1. Data Integration: Multi-source customer data consolidation
  2. Customer 360: Unified customer profile and journey mapping
  3. Behavioral Analysis: User interaction and engagement analysis
  4. Segmentation: Dynamic customer segmentation based on behavior
  5. Predictive Analytics: Churn prediction and lifetime value modeling
  6. Real-time Processing: Live customer state updates
  7. Visualization: Interactive dashboards and reporting

Technical stack: Pandas for analysis, scikit-learn for ML, and visualization libraries.

Project-Based Real World Questions (5)

46. Build a complete e-commerce recommendation system that handles 1M+ users and real-time updates.

Answer: Project approach:

  1. Data Collection: User behavior tracking, product catalogs, purchase history
  2. Feature Engineering: User profiles, item features, interaction matrices
  3. Model Implementation: Collaborative filtering, content-based, and hybrid models
  4. Real-time Serving: API endpoints with sub-100ms response times
  5. A/B Testing: Recommendation algorithm comparison framework
  6. Scalability: Distributed computing for model training and serving
  7. Performance Monitoring: Track recommendation quality and business metrics
  8. Data Pipeline: ETL for feature updates and model retraining

47. Create a financial fraud detection system processing millions of transactions daily.

Answer: Fraud detection system:

  1. Real-time Processing: Stream processing for transaction monitoring
  2. Feature Engineering: Transaction patterns, user behavior, merchant analysis
  3. ML Models: Ensemble methods, anomaly detection, and deep learning
  4. Rule Engine: Business rules combined with ML predictions
  5. Alert Management: Risk scoring and investigation workflow
  6. Performance: Low latency prediction (<50ms per transaction)
  7. Compliance: Audit trails and regulatory reporting
  8. Feedback Loop: Continuous model improvement from investigations

48. Design a log analysis and monitoring platform for microservices architecture.

Answer: Monitoring platform:

  1. Log Ingestion: Multi-source log collection and parsing
  2. Real-time Analytics: Stream processing for immediate insights
  3. Alerting System: Intelligent alerting with noise reduction
  4. Dashboard: Real-time system health and performance metrics
  5. Anomaly Detection: Automated detection of unusual patterns
  6. Root Cause Analysis: Correlation analysis across services
  7. Scalability: Handle TB+ of logs daily
  8. Integration: API integrations with existing monitoring tools

49. Build a content management and delivery system for media streaming.

Answer: Media platform:

  1. Content Pipeline: Video processing, transcoding, and optimization
  2. CDN Integration: Global content distribution and caching
  3. User Management: Authentication, subscriptions, and preferences
  4. Recommendation Engine: Personalized content recommendations
  5. Analytics: Viewing patterns, engagement metrics, and performance
  6. Scalability: Handle millions of concurrent streams
  7. Quality Adaptation: Adaptive bitrate streaming
  8. Content Protection: DRM and piracy prevention

50. Create a data migration platform for legacy system modernization.

Answer: Migration platform:

  1. Assessment: Legacy system analysis and mapping
  2. Data Mapping: Source-to-target schema transformation
  3. ETL Pipeline: Incremental and full data migration
  4. Validation Framework: Data integrity and completeness verification
  5. Rollback Strategy: Safe migration with rollback capabilities
  6. Performance: Minimize downtime during migration
  7. Monitoring: Real-time migration progress and issue tracking
  8. Documentation: Complete migration documentation and procedures

Optimization Questions (5)

51. Your Python application processes 100K records but takes 30 minutes. How do you optimize it?

Answer: Optimization strategy:

  1. Profiling: Use cProfile and memory_profiler to identify bottlenecks
  2. Vectorization: Replace loops with pandas/numpy operations
  3. Parallel Processing: Use multiprocessing for CPU-bound tasks
  4. Database Optimization: Bulk operations instead of row-by-row processing
  5. Memory Management: Generator expressions instead of lists
  6. Caching: Cache expensive computations and database queries
  7. Algorithm Improvement: Use more efficient algorithms and data structures
# Before: Slow loop
results = [expensive_function(item) for item in large_list]

# After: Parallel processing
from multiprocessing import Pool
with Pool() as pool:
    results = pool.map(expensive_function, large_list)

52. How do you optimize memory usage for processing large datasets in Python?

Answer: Memory optimization:

  1. Generators: Use generators instead of storing everything in memory
  2. Chunking: Process data in smaller chunks
  3. Data Types: Use appropriate data types (int32 vs int64)
  4. Memory Mapping: Use mmap for large file processing
  5. Garbage Collection: Manual garbage collection for large objects
  6. Streaming: Process data as it comes instead of loading all
# Memory-efficient file processing
def process_large_file(filename):
    with open(filename, 'r') as file:
        for line in file:  # Generator, not loading all lines
            yield process_line(line)

# Chunked processing
def process_dataframe_chunks(df, chunk_size=10000):
    for i in range(0, len(df), chunk_size):
        chunk = df[i:i + chunk_size]
        yield process_chunk(chunk)

53. Optimize a Python web API that has slow response times under load.

Answer: API optimization:

  1. Async Programming: Use FastAPI/aiohttp for concurrent request handling
  2. Database Connection Pooling: Reuse database connections
  3. Caching: Implement Redis caching for frequently accessed data
  4. Query Optimization: Optimize database queries and use indexes
  5. Response Compression: Enable gzip compression
  6. Load Balancing: Distribute load across multiple instances
  7. Monitoring: Track response times and bottlenecks
# Async endpoint with caching
@app.get("/users/{user_id}")
async def get_user(user_id: int, redis: Redis = Depends(get_redis)):
    cache_key = f"user:{user_id}"
    cached_user = await redis.get(cache_key)
    if cached_user:
        return json.loads(cached_user)
    
    user = await database.fetch_user(user_id)
    await redis.setex(cache_key, 300, json.dumps(user))
    return user

54. How do you optimize Python code for CPU-intensive mathematical calculations?

Answer: Mathematical optimization:

  1. NumPy: Use vectorized operations instead of Python loops
  2. Numba: JIT compilation for numeric functions
  3. Cython: Compile Python to C for performance gains
  4. Parallel Computing: Use multiprocessing for independent calculations
  5. Algorithm Choice: Choose efficient algorithms (O(n) vs O(n²))
  6. Memory Layout: Optimize data structures for cache efficiency
import numpy as np
from numba import jit

# Optimized with NumPy and Numba
@jit(nopython=True)
def fast_calculation(data):
    return np.sum(data ** 2) / len(data)

# Vectorized operations
result = np.mean(large_array ** 2)  # Much faster than loops

55. Optimize a data pipeline that processes millions of records daily.

Answer: Pipeline optimization:

  1. Batch Processing: Process records in optimal batch sizes
  2. Parallel Workers: Use multiple processes/threads for parallel processing
  3. Database Bulk Operations: Use bulk inserts/updates instead of individual operations
  4. Memory Management: Stream processing to avoid memory issues
  5. Error Handling: Efficient error handling without stopping entire pipeline
  6. Monitoring: Track processing rates and identify bottlenecks
  7. Resource Allocation: Optimize CPU and memory usage
# Optimized batch processing
def process_records_batch(records, batch_size=1000):
    for i in range(0, len(records), batch_size):
        batch = records[i:i + batch_size]
        processed_batch = [transform_record(r) for r in batch]
        database.bulk_insert(processed_batch)  # Bulk operation

# Parallel processing
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor(max_workers=4) as executor:
    results = executor.map(process_batch, record_batches)

Cost Saving and Best Practices Questions (5)

56. What are the key strategies for writing maintainable and scalable Python code?

Answer: Best practices:

  1. Code Organization: Use proper module structure and separation of concerns
  2. Type Hints: Add type annotations for better code documentation
  3. Documentation: Write clear docstrings and maintain README files
  4. Testing: Comprehensive unit tests and integration tests
  5. Error Handling: Proper exception handling and logging
  6. Code Reviews: Implement peer review processes
  7. Linting: Use tools like pylint, black, and isort for code quality
from typing import List, Optional

def process_user_data(
    users: List[dict], 
    filter_active: bool = True
) -> Optional[List[dict]]:
    """
    Process user data with optional filtering.
    
    Args:
        users: List of user dictionaries
        filter_active: Whether to filter only active users
    
    Returns:
        Processed user data or None if input is invalid
    """
    if not users:
        return None
    
    if filter_active:
        users = [u for u in users if u.get('active', False)]
    
    return [transform_user(user) for user in users]

57. How do you implement efficient error handling and logging in Python applications?

Answer: Error handling best practices:

  1. Specific Exceptions: Catch specific exceptions, not generic Exception
  2. Logging Strategy: Use appropriate log levels and structured logging
  3. Error Context: Include relevant context in error messages
  4. Graceful Degradation: Handle errors without crashing the application
  5. Monitoring: Implement error tracking and alerting
  6. Documentation: Document expected exceptions and handling
import logging
from typing import Optional

logger = logging.getLogger(__name__)

def safe_api_call(url: str) -> Optional[dict]:
    try:
        response = requests.get(url, timeout=30)
        response.raise_for_status()
        return response.json()
    except requests.exceptions.Timeout:
        logger.warning(f"API call timeout for URL: {url}")
        return None
    except requests.exceptions.HTTPError as e:
        logger.error(f"HTTP error {e.response.status_code} for URL: {url}")
        return None
    except ValueError as e:
        logger.error(f"Invalid JSON response from {url}: {e}")
        return None

58. What are the security best practices for Python applications?

Answer: Security practices:

  1. Input Validation: Validate and sanitize all user inputs
  2. SQL Injection Prevention: Use parameterized queries
  3. Secret Management: Never hardcode secrets, use environment variables
  4. Authentication: Implement proper authentication and authorization
  5. Dependencies: Keep dependencies updated and scan for vulnerabilities
  6. Logging Security: Don't log sensitive information
  7. HTTPS: Use secure communication protocols
import os
from cryptography.fernet import Fernet

# Secure configuration
class Config:
    SECRET_KEY = os.getenv('SECRET_KEY')
    DATABASE_URL = os.getenv('DATABASE_URL')
    
    @classmethod
    def validate_config(cls):
        if not cls.SECRET_KEY:
            raise ValueError("SECRET_KEY environment variable not set")

# Secure database query
def get_user_by_id(user_id: int):
    query = "SELECT * FROM users WHERE id = %s"
    return database.execute(query, (user_id,))  # Parameterized query

59. How do you optimize Python application performance while minimizing infrastructure costs?

Answer: Cost optimization strategies:

  1. Efficient Algorithms: Choose algorithms with better time complexity
  2. Resource Monitoring: Track CPU and memory usage patterns
  3. Caching: Implement caching to reduce database and API calls
  4. Async Programming: Use async/await for I/O-bound operations
  5. Database Optimization: Optimize queries and use connection pooling
  6. Auto-scaling: Scale resources based on demand
  7. Profiling: Regular performance profiling and optimization
# Cost-effective caching decorator
from functools import lru_cache
import redis

redis_client = redis.Redis()

def cached_database_call(expiry=300):
    def decorator(func):
        def wrapper(*args, **kwargs):
            cache_key = f"{func.__name__}:{hash(str(args) + str(kwargs))}"
            
            # Try cache first
            cached_result = redis_client.get(cache_key)
            if cached_result:
                return json.loads(cached_result)
            
            # Call function and cache result
            result = func(*args, **kwargs)
            redis_client.setex(cache_key, expiry, json.dumps(result))
            return result
        return wrapper
    return decorator

60. What are the key considerations for deploying Python applications in production?

Answer: Production deployment:

  1. Environment Management: Use virtual environments and dependency management
  2. Configuration: Environment-based configuration management
  3. Process Management: Use process managers like Gunicorn or uWSGI
  4. Monitoring: Application performance and health monitoring
  5. Logging: Centralized logging and log rotation
  6. Security: Secure deployment practices and access controls
  7. Backup Strategy: Regular backups and disaster recovery plans
# Production configuration
import os
from dataclasses import dataclass

@dataclass
class ProductionConfig:
    debug: bool = False
    workers: int = int(os.getenv('WORKERS', '4'))
    log_level: str = os.getenv('LOG_LEVEL', 'INFO')
    database_url: str = os.getenv('DATABASE_URL')
    redis_url: str = os.getenv('REDIS_URL')
    
    def __post_init__(self):
        if not self.database_url:
            raise ValueError("DATABASE_URL must be set in production")

# Health check endpoint
@app.get("/health")
async def health_check():
    try:
        # Check database connectivity
        await database.fetch_one("SELECT 1")
        return {"status": "healthy", "timestamp": time.time()}
    except Exception as e:
        return {"status": "unhealthy", "error": str(e)}

Additional Scenario-Based Questions

Quick Coding Scenarios (Interview Favorites)

1. Remove duplicates from a string:

def remove_duplicates(s):
    return ''.join(dict.fromkeys(s))

2. Find first non-repeating character:

def first_non_repeat(s):
    char_count = {}
    for char in s:
        char_count[char] = char_count.get(char, 0) + 1
    for char in s:
        if char_count[char] == 1:
            return char
    return None

3. Check if two strings are anagrams:

def are_anagrams(s1, s2):
    return sorted(s1.lower()) == sorted(s2.lower())

4. Find maximum subarray sum (Kadane's algorithm):

def max_subarray_sum(arr):
    max_sum = current_sum = arr[0]
    for num in arr[1:]:
        current_sum = max(num, current_sum + num)
        max_sum = max(max_sum, current_sum)
    return max_sum

5. Implement LRU cache:

from collections import OrderedDict

class LRUCache:
    def __init__(self, capacity):
        self.capacity = capacity
        self.cache = OrderedDict()
    
    def get(self, key):
        if key in self.cache:
            self.cache.move_to_end(key)
            return self.cache[key]
        return -1
    
    def put(self, key, value):
        if key in self.cache:
            self.cache.move_to_end(key)
        self.cache[key] = value
        if len(self.cache) > self.capacity:
            self.cache.popitem(last=False)

This comprehensive guide covers practical Python scenarios that are commonly asked in interviews, focusing on real-world problems with concise, implementable solutions.

No comments:

Post a Comment

Complete Data Engineering & BI Interview Preparation Guides

Complete Data Engineering & BI Interview Preparation Guides 📌 Data Engineering &...