Valkey Manager
Manager for starting and stopping a Valkey server subprocess.
This module provides the ValkeyManager class that automatically manages a Valkey
server instance for use by the PITA package. It can detect existing Valkey instances
and start a new one if needed.
ValkeyManager
Manager for starting and stopping a Valkey server subprocess.
This class provides utilities to automatically manage a Valkey server instance
for use by the PITA package. It can detect existing Valkey instances and start
a new one if needed.
Attributes:
| Name |
Type |
Description |
_process |
Optional[Popen]
|
The subprocess.Popen instance for the managed Valkey server, or None.
|
Source code in pita/utils/valkey_manager.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140 | class ValkeyManager:
"""
Manager for starting and stopping a Valkey server subprocess.
This class provides utilities to automatically manage a Valkey server instance
for use by the PITA package. It can detect existing Valkey instances and start
a new one if needed.
Attributes:
_process: The subprocess.Popen instance for the managed Valkey server, or None.
"""
_process: Optional[subprocess.Popen] = None
@classmethod
def start(cls) -> None:
"""Starts the Valkey server if it is not already running."""
# Check if we already started it
if cls._process is not None:
return
# Check if already running system-wide or by another process
if cls._is_valkey_running():
return
valkey_executable = cls._find_valkey_executable()
if not valkey_executable:
print("Warning: valkey-server executable not found. Please ensure it is installed.")
return
print(f"Starting valkey-server on port {VALKEY_PORT}...")
try:
# Start valkey-server as a subprocess
# We use --daemonize no so we can control the subprocess lifecycle easily
cls._process = subprocess.Popen(
[valkey_executable, '--port', str(VALKEY_PORT), '--save', '', '--appendonly', 'no'],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
# Wait a moment to ensure it starts
time.sleep(0.5)
if cls._process.poll() is not None:
print("Failed to start valkey-server subprocess.")
cls._process = None
else:
print("Valkey server started successfully.")
# Register cleanup on exit
atexit.register(cls.stop)
except Exception as e:
print(f"Failed to start valkey-server: {e}")
@classmethod
def stop(cls) -> None:
"""
Stop the Valkey server if it was started by this manager.
This method attempts to gracefully terminate the Valkey process, waiting up to
2 seconds before forcefully killing it if necessary.
"""
if cls._process:
print("Stopping valkey-server...")
cls._process.terminate()
try:
cls._process.wait(timeout=2)
except subprocess.TimeoutExpired:
cls._process.kill()
cls._process = None
@classmethod
def _is_valkey_running(cls) -> bool:
"""
Check if a Valkey or Redis server process is currently running on the system.
Since Valkey and Redis are protocol-compatible, we check for both server types
to avoid starting a new instance when an existing compatible server is running.
Returns:
True if a valkey-server or redis-server process is found, False otherwise.
"""
for proc in psutil.process_iter(['name']):
try:
proc_name = proc.info['name']
if 'valkey-server' in proc_name or 'redis-server' in proc_name:
# Could add stricter check for port/args, but simplified for now
return True
except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
# Process may have terminated or be inaccessible; safely skip it and continue scanning
pass
return False
@classmethod
def _find_valkey_executable(cls) -> Optional[str]:
"""
Find the valkey-server executable in various system locations.
This method searches for valkey-server in the system PATH, conda environments,
and relative to the Python executable.
Returns:
The absolute path to the valkey-server executable, or None if not found.
"""
import sys
executable = shutil.which('valkey-server')
if executable:
return executable
# Fallback to checking conda env bin if not in PATH
conda_prefix = os.environ.get('CONDA_PREFIX')
if conda_prefix:
candidate = os.path.join(conda_prefix, 'bin', 'valkey-server')
if os.path.exists(candidate):
return candidate
# Fallback relative to sys.executable (common in conda envs)
# sys.executable is .../bin/python, so we look in .../bin/valkey-server
bin_dir = os.path.dirname(sys.executable)
candidate = os.path.join(bin_dir, 'valkey-server')
if os.path.exists(candidate):
return candidate
return None
|
start() -> None
classmethod
Starts the Valkey server if it is not already running.
Source code in pita/utils/valkey_manager.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69 | @classmethod
def start(cls) -> None:
"""Starts the Valkey server if it is not already running."""
# Check if we already started it
if cls._process is not None:
return
# Check if already running system-wide or by another process
if cls._is_valkey_running():
return
valkey_executable = cls._find_valkey_executable()
if not valkey_executable:
print("Warning: valkey-server executable not found. Please ensure it is installed.")
return
print(f"Starting valkey-server on port {VALKEY_PORT}...")
try:
# Start valkey-server as a subprocess
# We use --daemonize no so we can control the subprocess lifecycle easily
cls._process = subprocess.Popen(
[valkey_executable, '--port', str(VALKEY_PORT), '--save', '', '--appendonly', 'no'],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL
)
# Wait a moment to ensure it starts
time.sleep(0.5)
if cls._process.poll() is not None:
print("Failed to start valkey-server subprocess.")
cls._process = None
else:
print("Valkey server started successfully.")
# Register cleanup on exit
atexit.register(cls.stop)
except Exception as e:
print(f"Failed to start valkey-server: {e}")
|
stop() -> None
classmethod
Stop the Valkey server if it was started by this manager.
This method attempts to gracefully terminate the Valkey process, waiting up to
2 seconds before forcefully killing it if necessary.
Source code in pita/utils/valkey_manager.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86 | @classmethod
def stop(cls) -> None:
"""
Stop the Valkey server if it was started by this manager.
This method attempts to gracefully terminate the Valkey process, waiting up to
2 seconds before forcefully killing it if necessary.
"""
if cls._process:
print("Stopping valkey-server...")
cls._process.terminate()
try:
cls._process.wait(timeout=2)
except subprocess.TimeoutExpired:
cls._process.kill()
cls._process = None
|
Overview
The valkey_manager module provides utilities for managing Valkey connections and data storage, primarily used for inter-process communication between the main inference process and logits processors in vLLM and TensorRT backends.
Use Cases
Valkey is used in PITA for:
- Logits Processor Communication: vLLM and TensorRT run logits processors in separate threads. These processors calculate normalization constants and entropy values, storing them in Valkey for retrieval by the main process.
- Temporary Data Storage: Metrics are stored temporarily and automatically cleaned up after retrieval.
- Multi-threaded Coordination: Enables safe data sharing between the inference engine threads and the main application.
Configuration
Valkey connection parameters are configured via environment variables (see constants):
VALKEY_HOST: Valkey server hostname (default: "localhost")
VALKEY_PORT: Valkey server port (default: 6379)
VALKEY_DB: Valkey database number (default: 0)
Requirements
Valkey is required for the following backends when logits_processor=True:
- vLLM backend with entropy or power distribution metrics
- TensorRT backend with entropy or power distribution metrics
Install Valkey:
# Ubuntu/Debian
sudo apt-get install valkey-server
# macOS
brew install valkey
# Or via pip for Python client
pip install valkey
Start Valkey server:
valkey-server
Example Usage
The Valkey manager is typically used internally by the logits processors, but you can access it directly if needed:
from pita.utils.valkey_manager import ValkeyManager
from pita.utils.constants import VALKEY_HOST, VALKEY_PORT, VALKEY_DB
# Initialize Valkey connection (usually done internally)
valkey_client = ValkeyManager(
host=VALKEY_HOST,
port=VALKEY_PORT,
db=VALKEY_DB
)
# The logits processors use Valkey to store metrics
# These are automatically retrieved and cleaned up by the sampler
For most users, Valkey integration is transparent - simply ensure Valkey is running when using vLLM or TensorRT backends with logits_processor=True.