Skip to content

Valkey Manager

Manager for starting and stopping a Valkey server subprocess.

This module provides the ValkeyManager class that automatically manages a Valkey server instance for use by the PITA package. It can detect existing Valkey instances and start a new one if needed.

ValkeyManager

Manager for starting and stopping a Valkey server subprocess.

This class provides utilities to automatically manage a Valkey server instance for use by the PITA package. It can detect existing Valkey instances and start a new one if needed.

Attributes:

Name Type Description
_process Optional[Popen]

The subprocess.Popen instance for the managed Valkey server, or None.

Source code in pita/utils/valkey_manager.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
class ValkeyManager:
    """
    Manager for starting and stopping a Valkey server subprocess.

    This class provides utilities to automatically manage a Valkey server instance
    for use by the PITA package. It can detect existing Valkey instances and start
    a new one if needed.

    Attributes:
        _process: The subprocess.Popen instance for the managed Valkey server, or None.
    """
    _process: Optional[subprocess.Popen] = None

    @classmethod
    def start(cls) -> None:
        """Starts the Valkey server if it is not already running."""
        # Check if we already started it
        if cls._process is not None:
            return

        # Check if already running system-wide or by another process
        if cls._is_valkey_running():
            return

        valkey_executable = cls._find_valkey_executable()

        if not valkey_executable:
            print("Warning: valkey-server executable not found. Please ensure it is installed.")
            return

        print(f"Starting valkey-server on port {VALKEY_PORT}...")
        try:
            # Start valkey-server as a subprocess
            # We use --daemonize no so we can control the subprocess lifecycle easily
            cls._process = subprocess.Popen(
                [valkey_executable, '--port', str(VALKEY_PORT), '--save', '', '--appendonly', 'no'],
                stdout=subprocess.DEVNULL,
                stderr=subprocess.DEVNULL
            )

            # Wait a moment to ensure it starts
            time.sleep(0.5)
            if cls._process.poll() is not None:
                print("Failed to start valkey-server subprocess.")
                cls._process = None
            else:
                print("Valkey server started successfully.")
                # Register cleanup on exit
                atexit.register(cls.stop)

        except Exception as e:
            print(f"Failed to start valkey-server: {e}")

    @classmethod
    def stop(cls) -> None:
        """
        Stop the Valkey server if it was started by this manager.

        This method attempts to gracefully terminate the Valkey process, waiting up to
        2 seconds before forcefully killing it if necessary.
        """
        if cls._process:
            print("Stopping valkey-server...")
            cls._process.terminate()
            try:
                cls._process.wait(timeout=2)
            except subprocess.TimeoutExpired:
                cls._process.kill()
            cls._process = None

    @classmethod
    def _is_valkey_running(cls) -> bool:
        """
        Check if a Valkey or Redis server process is currently running on the system.

        Since Valkey and Redis are protocol-compatible, we check for both server types
        to avoid starting a new instance when an existing compatible server is running.

        Returns:
            True if a valkey-server or redis-server process is found, False otherwise.
        """
        for proc in psutil.process_iter(['name']):
            try:
                proc_name = proc.info['name']
                if 'valkey-server' in proc_name or 'redis-server' in proc_name:
                    # Could add stricter check for port/args, but simplified for now
                    return True
            except (psutil.NoSuchProcess, psutil.AccessDenied, psutil.ZombieProcess):
                # Process may have terminated or be inaccessible; safely skip it and continue scanning
                pass
        return False

    @classmethod
    def _find_valkey_executable(cls) -> Optional[str]:
        """
        Find the valkey-server executable in various system locations.

        This method searches for valkey-server in the system PATH, conda environments,
        and relative to the Python executable.

        Returns:
            The absolute path to the valkey-server executable, or None if not found.
        """
        import sys
        executable = shutil.which('valkey-server')
        if executable:
            return executable

        # Fallback to checking conda env bin if not in PATH
        conda_prefix = os.environ.get('CONDA_PREFIX')
        if conda_prefix:
            candidate = os.path.join(conda_prefix, 'bin', 'valkey-server')
            if os.path.exists(candidate):
                return candidate

        # Fallback relative to sys.executable (common in conda envs)
        # sys.executable is .../bin/python, so we look in .../bin/valkey-server
        bin_dir = os.path.dirname(sys.executable)
        candidate = os.path.join(bin_dir, 'valkey-server')
        if os.path.exists(candidate):
            return candidate

        return None

start() -> None classmethod

Starts the Valkey server if it is not already running.

Source code in pita/utils/valkey_manager.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
@classmethod
def start(cls) -> None:
    """Starts the Valkey server if it is not already running."""
    # Check if we already started it
    if cls._process is not None:
        return

    # Check if already running system-wide or by another process
    if cls._is_valkey_running():
        return

    valkey_executable = cls._find_valkey_executable()

    if not valkey_executable:
        print("Warning: valkey-server executable not found. Please ensure it is installed.")
        return

    print(f"Starting valkey-server on port {VALKEY_PORT}...")
    try:
        # Start valkey-server as a subprocess
        # We use --daemonize no so we can control the subprocess lifecycle easily
        cls._process = subprocess.Popen(
            [valkey_executable, '--port', str(VALKEY_PORT), '--save', '', '--appendonly', 'no'],
            stdout=subprocess.DEVNULL,
            stderr=subprocess.DEVNULL
        )

        # Wait a moment to ensure it starts
        time.sleep(0.5)
        if cls._process.poll() is not None:
            print("Failed to start valkey-server subprocess.")
            cls._process = None
        else:
            print("Valkey server started successfully.")
            # Register cleanup on exit
            atexit.register(cls.stop)

    except Exception as e:
        print(f"Failed to start valkey-server: {e}")

stop() -> None classmethod

Stop the Valkey server if it was started by this manager.

This method attempts to gracefully terminate the Valkey process, waiting up to 2 seconds before forcefully killing it if necessary.

Source code in pita/utils/valkey_manager.py
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
@classmethod
def stop(cls) -> None:
    """
    Stop the Valkey server if it was started by this manager.

    This method attempts to gracefully terminate the Valkey process, waiting up to
    2 seconds before forcefully killing it if necessary.
    """
    if cls._process:
        print("Stopping valkey-server...")
        cls._process.terminate()
        try:
            cls._process.wait(timeout=2)
        except subprocess.TimeoutExpired:
            cls._process.kill()
        cls._process = None

Overview

The valkey_manager module provides utilities for managing Valkey connections and data storage, primarily used for inter-process communication between the main inference process and logits processors in vLLM and TensorRT backends.

Use Cases

Valkey is used in PITA for:

  • Logits Processor Communication: vLLM and TensorRT run logits processors in separate threads. These processors calculate normalization constants and entropy values, storing them in Valkey for retrieval by the main process.
  • Temporary Data Storage: Metrics are stored temporarily and automatically cleaned up after retrieval.
  • Multi-threaded Coordination: Enables safe data sharing between the inference engine threads and the main application.

Configuration

Valkey connection parameters are configured via environment variables (see constants):

  • VALKEY_HOST: Valkey server hostname (default: "localhost")
  • VALKEY_PORT: Valkey server port (default: 6379)
  • VALKEY_DB: Valkey database number (default: 0)

Requirements

Valkey is required for the following backends when logits_processor=True:

  • vLLM backend with entropy or power distribution metrics
  • TensorRT backend with entropy or power distribution metrics

Install Valkey:

# Ubuntu/Debian
sudo apt-get install valkey-server

# macOS
brew install valkey

# Or via pip for Python client
pip install valkey

Start Valkey server:

valkey-server

Example Usage

The Valkey manager is typically used internally by the logits processors, but you can access it directly if needed:

from pita.utils.valkey_manager import ValkeyManager
from pita.utils.constants import VALKEY_HOST, VALKEY_PORT, VALKEY_DB

# Initialize Valkey connection (usually done internally)
valkey_client = ValkeyManager(
    host=VALKEY_HOST,
    port=VALKEY_PORT,
    db=VALKEY_DB
)

# The logits processors use Valkey to store metrics
# These are automatically retrieved and cleaned up by the sampler

For most users, Valkey integration is transparent - simply ensure Valkey is running when using vLLM or TensorRT backends with logits_processor=True.