Skip to main content

Advanced Type Usage

...

Due to an unavailability to support all possible types in PostgreSQL, we have a way to encode Python types into PostgreSQL ones and decode wise versa.

This section has Advanced in the name because you'll need to work with raw bytes which can be difficult for some developers.

Pass unsupported type into PostgreSQL

If you are using some type that we don't support and want to insert it into PostgreSQL from PSQLPy, you must use PyCustomType class.

Let's assume we have table for_test in the database and PSQLPy doesn't support (only for demonstration) VARCHAR type:

database typedatabase column name
VARCHARnickname
from typing import Final

from psqlpy import ConnectionPool
from psqlpy.extra_types import PyCustomType


async def main() -> None:
    # It uses default connection parameters
    db_pool: Final = ConnectionPool()

    await db_pool.execute(
        "INSERT INTO for_test (nickname) VALUES ($1)",
        [PyCustomType(b"SomeDataInBytes")],
    )
    db_pool.close()

Here we pass PyCustomType into the parameters. It accepts only bytes.

Important

You must make bytes passed into PyCustomType readable for PostgreSQL. If bytes will be wrong, you will get an exception.

Decode unsupported type from PostgreSQL

When you retrieve some data from the PostgreSQL there are can be data types that we don't support yet. To deal with this situation, you can use custom_decoders parameter in result() and as_class() methods.

Let's assume we have table for_test in the database and PSQLPy doesn't support (only for demonstration) VARCHAR type:

database typedatabase column name
VARCHARnickname
from typing import Final, Any

from psqlpy import ConnectionPool, QueryResult
from psqlpy.extra_types import PyCustomType


def nickname_decoder(bytes_from_psql: bytes | None) -> str:
    return bytes_from_psql.decode() if bytes_from_psql else None


async def main() -> None:
    # It uses default connection parameters
    db_pool: Final = ConnectionPool()

    result: QueryResult = await db_pool.execute(
        "SELECT * FROM for_test",
        [PyCustomType(b"SomeDataInBytes")],
    )

    parsed_result: list[dict[str, Any]] = result.result(
        custom_decoders={
            "nickname": nickname_decoder,
        },
    )
    db_pool.close()

Important

Rules about custom_decoders parameter:

  • The key of the dict must be the name of the field on which you want to apply the decode function.
  • If you use aliases for the result field name, you must specify the alias.
  • The key of the dict must be in lowercase.