Skip to content

Commit

Permalink
refactor: improve lossless coding submodule
Browse files Browse the repository at this point in the history
- Fix Sardinas–Patterson algorithm implementation.
- Add suport for partial parsing of prefix-free dictionaries.
- Implement `is_fully_covering` method for variable-to-fixed codes.
- Reorganize files in the submodule.
  • Loading branch information
rwnobrega committed Dec 31, 2024
1 parent 04a26fd commit ed1d173
Show file tree
Hide file tree
Showing 9 changed files with 827 additions and 328 deletions.
273 changes: 196 additions & 77 deletions src/komm/_lossless_coding/FixedToVariableCode.py

Large diffs are not rendered by default.

54 changes: 53 additions & 1 deletion src/komm/_lossless_coding/HuffmanCode.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
import heapq
from itertools import product
from math import prod
from typing import Literal

import numpy.typing as npt
from typing_extensions import Self

from .._util.information_theory import PMF
from .FixedToVariableCode import FixedToVariableCode
from .util import huffman_algorithm
from .util import Word


def HuffmanCode(
Expand All @@ -15,6 +19,8 @@ def HuffmanCode(
r"""
Binary Huffman code. It is an optimal (minimal expected rate) [fixed-to-variable length code](/ref/FixedToVariableCode) for a given probability mass function. For more details, see <cite>Say06, Sec. 3.2</cite>.
Notes:
Huffman codes are always [prefix-free](/ref/FixedToVariableCode/#is_prefix_free) (hence [uniquely decodable](/ref/FixedToVariableCode/#is_uniquely_decodable)).
Parameters:
pmf: The probability mass function of the source.
source_block_size: The source block size $k$. The default value is $k = 1$.
Expand Down Expand Up @@ -52,3 +58,49 @@ def HuffmanCode(
source_cardinality=pmf.size,
codewords=huffman_algorithm(pmf, source_block_size, policy),
)


def huffman_algorithm(
pmf: PMF, source_block_size: int, policy: Literal["high", "low"]
) -> list[Word]:
class Node:
def __init__(self, index: int, probability: float):
self.index: int = index
self.probability: float = probability
self.parent: int | None = None
self.bit: int = -1

def __lt__(self, other: Self) -> bool:
i0, p0 = self.index, self.probability
i1, p1 = other.index, other.probability
if policy == "high":
return (p0, i0) < (p1, i1)
elif policy == "low":
return (p0, -i0) < (p1, -i1)

tree = [
Node(i, prod(probs))
for (i, probs) in enumerate(product(pmf, repeat=source_block_size))
]
queue = [node for node in tree]
heapq.heapify(queue)
while len(queue) > 1:
node1 = heapq.heappop(queue)
node0 = heapq.heappop(queue)
node1.bit = 1
node0.bit = 0
node = Node(index=len(tree), probability=node0.probability + node1.probability)
node0.parent = node1.parent = node.index
heapq.heappush(queue, node)
tree.append(node)

codewords: list[Word] = []
for symbol in range(pmf.size**source_block_size):
node = tree[symbol]
bits: list[int] = []
while node.parent is not None:
bits.insert(0, node.bit)
node = tree[node.parent]
codewords.append(tuple(bits))

return codewords
34 changes: 31 additions & 3 deletions src/komm/_lossless_coding/TunstallCode.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import numpy as np
import heapq
from math import ceil, log2

import numpy.typing as npt
from typing_extensions import Self

from .._util.information_theory import PMF
from .util import tunstall_algorithm
from .util import Word
from .VariableToFixedCode import VariableToFixedCode


Expand All @@ -13,6 +16,9 @@ def TunstallCode(
r"""
Binary Tunstall code. It is an optimal (minimal expected rate) [variable-to-fixed length code](/ref/VariableToFixedCode) for a given probability mass function. For more details, see <cite>Say06, Sec. 3.7</cite>.
Notes:
Tunstall codes are always [prefix-free](/ref/VariableToFixedCode/#is_prefix_free) (hence [uniquely encodable](/ref/VariableToFixedCode/#is_uniquely_encodable)) and [fully covering](/ref/VariableToFixedCode/#is_fully_covering).
Parameters:
pmf: The probability mass function of the source.
target_block_size: The target block size $n$. Must satisfy $2^n \geq S$, where $S$ is the cardinality of the source alphabet, given by `len(pmf)`. The default value is $n = \lceil \log_2 S \rceil$.
Expand Down Expand Up @@ -42,10 +48,32 @@ def TunstallCode(
"""
pmf = PMF(pmf)
if target_block_size is None:
target_block_size = int(np.ceil(np.log2(pmf.size)))
target_block_size = ceil(log2(pmf.size))
elif 2**target_block_size < pmf.size:
raise ValueError("'target_block_size' is too low")
return VariableToFixedCode.from_sourcewords(
target_cardinality=2,
sourcewords=tunstall_algorithm(pmf, target_block_size),
)


def tunstall_algorithm(pmf: PMF, code_block_size: int) -> list[Word]:
class Node:
def __init__(self, symbols: Word, probability: float):
self.symbols = symbols
self.probability = probability

def __lt__(self, other: Self) -> bool:
return -self.probability < -other.probability

queue = [Node((symbol,), probability) for (symbol, probability) in enumerate(pmf)]
heapq.heapify(queue)

while len(queue) + pmf.size - 1 < 2**code_block_size:
node = heapq.heappop(queue)
for symbol, probability in enumerate(pmf):
new_node = Node(node.symbols + (symbol,), node.probability * probability)
heapq.heappush(queue, new_node)
sourcewords = sorted(node.symbols for node in queue)

return sourcewords
Loading

0 comments on commit ed1d173

Please sign in to comment.