# Third-party imports
from typing import Union
import numpy as np
import openfhe
# Local imports
from openfhe_numpy._onp_cpp import *
from openfhe_numpy.utils.constants import DataType
from openfhe_numpy.utils.log import ONP_ERROR
from openfhe_numpy.utils.packing import (
# _get_shape,
_pack_matrix_col_wise,
_pack_matrix_row_wise,
_pack_vector_row_wise,
_pack_vector_col_wise,
)
from openfhe_numpy.utils.typecheck import *
from openfhe_numpy.utils.matlib import is_power_of_two
from openfhe_numpy.tensor.ctarray import CTArray
from openfhe_numpy.tensor.ptarray import PTArray
from openfhe_numpy.tensor.tensor import FHETensor
# TODO: constructor for block matrix
def _get_block_dimensions(data, slots) -> tuple[int, int]:
pass
# @validate_call
def _pack_array(
data: np.ndarray | Number | list,
batch_size: int,
order: int = ROW_MAJOR,
mode: str = "repeat",
**kwargs,
):
"""
Helper function to flatten a scalar, vector, or matrix into a 1D array and fill all slots by either padding with zeros or duplicating the array contents.
Parameters
----------
data : list | tuple | np.ndarray | int
batch_size : int
The number of available plaintext slots
order : int, optional
The packing/encoding style, by default ArrayEncodingType.ROW_MAJOR
repeats : int, optional
The number of repetitions.
When repeats = -1, the the array content is duplicated all avaialble slots. Ortherwise, it will repeats n time. when repeats = 1, means only repeat 1 times,
Example: batch_size = 32, encoding type is row-major, repeats
Case 1: repeats = -1
[[1 2 3], [4 5 6]] -> 1 2 3 0 4 5 6 0 | 1 2 3 0 4 5 6 0 | 1 2 3 0 4 5 6 0 | 1 2 3 0 4 5 6 0
[[1], [2], [3]] -> 1 1 1 1 | 2 2 2 2 | 3 3 3 3 | 0 0 0 0
[1 2 3] -> 1 2 3 0 1 2 3 0 | 1 2 3 0 1 2 3 0 | 1 2 3 0 1 2 3 0 | 1 2 3 0 1 2 3 0
[1] -> 1 0 0 0 | 1 0 0 0 | 1 0 0 0 | 1 0 0 0 | 1 0 0 0 | 1 0 0 0 | 1 0 0 0 | 1 0 0 0
Returns
-------
_type_
_description_
"""
if batch_size < 0:
ONP_ERROR("The batch size cannot be negative.")
if not is_power_of_two(batch_size):
ONP_ERROR(f"Batch size [{batch_size}] must be a power of two")
# should follow numpy standards to avoid confusion
# org_rows, org_cols, ndim = _get_shape(data)
if is_numeric_scalar(data):
if mode == "zero":
packed_data = np.zeros(batch_size, dtype=type(data))
packed_data[0] = data
elif mode == "repeat":
packed_data = np.full(batch_size, data)
else:
ONP_ERROR(f"Invalid padding mode: '{mode}'. Valid options are 'zero' or 'repeat'.")
elif is_numeric_arraylike(data):
if data.ndim == 2:
packed_data, shape = _ravel_matrix(data, batch_size, order, True, mode, **kwargs)
elif data.ndim == 1:
packed_data, shape = _ravel_vector(data, batch_size, order, True, mode, **kwargs)
else:
ONP_ERROR("Not support dimension [{ndim}]")
else:
ONP_ERROR("Input is not numeric")
return {
"data": packed_data,
"original_shape": data.shape,
"ndim": data.ndim,
"batch_size": batch_size,
"shape": shape,
"order": order,
}
[docs]
def array(
cc: openfhe.CryptoContext,
data: list | tuple | np.ndarray | int,
batch_size: int = -1,
order: int = ROW_MAJOR,
type: str = DataType.CIPHERTEXT,
mode: str = "repeat",
package: dict = {},
public_key: openfhe.PublicKey = None,
**kwargs,
) -> FHETensor:
"""
Construct either a ciphertext or plaintext (CTArray/PTArray) from raw input data.
Parameters
----------
cc : CryptoContext
The OpenFHE CryptoContext.
data : matrix/vector/int
batch_size : int
Number of total plaintext batch_size.
order : int
Encoding order: ArrayEncodingType.ROW_MAJOR or COL_MAJOR.
type : str
DataType.CIPHERTEXT or PLAINTEXT.
public_key : optional
Public key needed for encryption if the output is encrypted
Returns
- A matrix has a dimension of m x n (m rows x n cols)
- A vector is considered as n x 1 matrix
- A number is considered as a vector with duplicated entries: [1 1 1 1]
-------
FHETensor Object
"""
if package == {}:
package = _pack_array(data, batch_size, order, mode, **kwargs)
packed_data = package["data"]
ndim = package["ndim"]
batch_size = package["batch_size"]
shape = package["shape"]
order = package["order"]
plaintext = cc.MakeCKKSPackedPlaintext(packed_data)
print(packed_data[:64])
if type == DataType.PLAINTEXT:
result = PTArray(plaintext, package["original_shape"], ndim, batch_size, shape, order)
else:
if public_key is None:
raise ValueError("Public key must be provided for ciphertext encoding.")
ciphertext = cc.Encrypt(public_key, plaintext)
result = CTArray(ciphertext, package["original_shape"], batch_size, shape, order, True)
result.set_shape(shape)
result.set_batch_size(batch_size)
return result
def _ravel_matrix(data, batch_size, order=ROW_MAJOR, pad_to_pow2=True, mode="repeat", **kwargs):
"""
Encode a 2D matrix into a packed array.
Returns
-------
Plaintext
"""
if order == ROW_MAJOR:
packed_data, shape = _pack_matrix_row_wise(data, batch_size, pad_to_pow2, mode)
elif order == COL_MAJOR:
packed_data, shape = _pack_matrix_col_wise(data, batch_size, pad_to_pow2, mode)
else:
raise ValueError("Unsupported encoding order")
return packed_data, shape
def _ravel_vector(data, batch_size, order=ROW_MAJOR, pad_to_pow2=True, tile="repeats", **kwargs):
"""
Encode a 1D vector into a packed array.
Parameters
----------
data : list
repeats : int
Number of repeats.
order : ArrayEncodingType
Returns
-------
Plaintext
"""
target_cols = 0
if "target_cols" in kwargs:
if isinstance(kwargs["target_cols"], int) and kwargs["target_cols"] > 0:
target_cols = kwargs.get("target_cols")
pad_value = kwargs.get("pad_value", "repeat")
expand = kwargs.get("expand", "repeat")
if order == ROW_MAJOR:
packed_data, shape = _pack_vector_row_wise(data, batch_size, target_cols, expand, tile, pad_to_pow2, pad_value)
elif order == COL_MAJOR:
packed_data, shape = _pack_vector_col_wise(data, batch_size, target_cols, expand, tile, pad_to_pow2, pad_value)
else:
raise ONP_ERROR("Unsupported encoding order")
return packed_data, shape