Source code for pihole_lib.domains

"""Pi-hole domain management operations."""

from urllib.parse import quote

from pihole_lib.base import BasePiHoleAPIClient
from pihole_lib.exceptions import PiHoleAPIError
from pihole_lib.models.domains import (
    Domain,
    DomainBatchDeleteItem,
    DomainKind,
    DomainMutationResponse,
    DomainRequest,
    DomainsResponse,
    DomainType,
)
from pihole_lib.utils import make_pihole_request


[docs] class PiHoleDomains(BasePiHoleAPIClient): """Pi-hole domain management operations. This class provides methods to manage domains on your Pi-hole instance, including adding, updating, deleting, and retrieving domains with various filtering options. Examples:: from pihole_lib import PiHoleClient, DomainType, DomainKind with PiHoleClient("http://192.168.1.100", password="secret") as client: domains = client.domains # Get all domains all_domains = domains.get_domains() # Get only allowed domains allowed_domains = domains.get_domains(domain_type=DomainType.ALLOW) # Get specific domain domain = domains.get_domain("example.com", DomainType.ALLOW, DomainKind.EXACT) # Add a new domain result = domains.add_domain( domain="badsite.com", domain_type=DomainType.DENY, domain_kind=DomainKind.EXACT, comment="Blocked site", groups=[0], enabled=True ) # Update a domain updated = domains.update_domain( domain="example.com", domain_type=DomainType.ALLOW, domain_kind=DomainKind.EXACT, comment="Updated comment", enabled=False ) # Delete a domain domains.delete_domain("badsite.com", DomainType.DENY, DomainKind.EXACT) # Batch delete domains domains.batch_delete_domains([ DomainBatchDeleteItem( item="site1.com", type=DomainType.DENY, kind=DomainKind.EXACT ), DomainBatchDeleteItem( item="site2.com", type=DomainType.DENY, kind=DomainKind.EXACT ) ]) """ BASE_URL = "/api/domains"
[docs] def get_domains( self, domain_type: DomainType | None = None, domain_kind: DomainKind | None = None, domain: str | None = None, ) -> list[Domain]: """Get domains with optional filtering. Args: domain_type: Filter by domain type (allow/deny). domain_kind: Filter by domain kind (exact/regex). domain: Filter by specific domain name. Returns: List of Domain objects matching the filters. Raises: PiHoleAPIError: API request failed. PiHoleConnectionError: Connection failed. PiHoleServerError: Server error. Examples:: # Get all domains all_domains = domains.get_domains() # Get only allowed domains allowed = domains.get_domains(domain_type=DomainType.ALLOW) # Get only exact domains exact = domains.get_domains(domain_kind=DomainKind.EXACT) # Get specific domain across all types/kinds specific = domains.get_domains(domain="example.com") # Get allowed exact domains allowed_exact = domains.get_domains( domain_type=DomainType.ALLOW, domain_kind=DomainKind.EXACT ) """ # Build the URL path based on filters path_parts = [self.BASE_URL] if domain_type: path_parts.append(domain_type.value) if domain_kind: path_parts.append(domain_kind.value) if domain: # URL encode the domain for safe transmission encoded_domain = quote(domain, safe="") path_parts.append(encoded_domain) url_path = "/".join(path_parts) response = make_pihole_request(self._client, "GET", url_path) data = response.json() domains_response = DomainsResponse(**data) return domains_response.domains
[docs] def get_domain( self, domain: str, domain_type: DomainType, domain_kind: DomainKind ) -> Domain | None: """Get a specific domain by exact match. Args: domain: The domain name to retrieve. domain_type: Type of domain (allow/deny). domain_kind: Kind of domain (exact/regex). Returns: Domain object if found, None otherwise. Raises: PiHoleAPIError: API request failed. PiHoleConnectionError: Connection failed. PiHoleServerError: Server error. Examples:: # Get specific allowed exact domain domain = domains.get_domain( "example.com", DomainType.ALLOW, DomainKind.EXACT ) if domain: print(f"Found domain: {domain.domain}") else: print("Domain not found") """ domains_list = self.get_domains(domain_type, domain_kind, domain) # Return the first match (should be unique) return domains_list[0] if domains_list else None
[docs] def add_domain( self, domain: str, domain_type: DomainType, domain_kind: DomainKind, comment: str | None = None, groups: list[int] | None = None, enabled: bool = True, ) -> DomainMutationResponse: r"""Add a new domain. Args: domain: The domain name or regex pattern to add. domain_type: Type of domain (allow/deny). domain_kind: Kind of domain (exact/regex). comment: Optional comment for the domain. groups: List of group IDs. Defaults to [0] if not provided. enabled: Whether the domain is enabled. Defaults to True. Returns: DomainMutationResponse with the operation results. Raises: PiHoleAPIError: API request failed or domain already exists. PiHoleConnectionError: Connection failed. PiHoleServerError: Server error. Examples:: # Add an exact blocked domain result = domains.add_domain( domain="badsite.com", domain_type=DomainType.DENY, domain_kind=DomainKind.EXACT, comment="Malicious site", groups=[0, 1], enabled=True ) # Add a regex pattern result = domains.add_domain( domain=r".*\.ads\..*", domain_type=DomainType.DENY, domain_kind=DomainKind.REGEX, comment="Block ads subdomains" ) """ if groups is None: groups = [0] url_path = f"{self.BASE_URL}/{domain_type.value}/{domain_kind.value}" request_data = DomainRequest( domain=domain, type=None, kind=None, comment=comment, groups=groups, enabled=enabled, ) # Convert to dict and remove None values payload = {k: v for k, v in request_data.model_dump().items() if v is not None} response = make_pihole_request(self._client, "POST", url_path, json=payload) data = response.json() return DomainMutationResponse(**data)
[docs] def update_domain( self, domain: str, domain_type: DomainType, domain_kind: DomainKind, new_type: DomainType | None = None, new_kind: DomainKind | None = None, comment: str | None = None, groups: list[int] | None = None, enabled: bool | None = None, ) -> DomainMutationResponse: """Update an existing domain. Args: domain: The domain name to update. domain_type: Current type of domain (allow/deny). domain_kind: Current kind of domain (exact/regex). new_type: New type to move domain to (optional). new_kind: New kind to move domain to (optional). comment: New comment for the domain. groups: New list of group IDs. enabled: New enabled status. Returns: DomainMutationResponse with the operation results. Raises: PiHoleAPIError: API request failed or domain not found. PiHoleConnectionError: Connection failed. PiHoleServerError: Server error. Examples:: # Update domain comment and disable it result = domains.update_domain( domain="example.com", domain_type=DomainType.ALLOW, domain_kind=DomainKind.EXACT, comment="Updated comment", enabled=False ) # Move domain from allow to deny result = domains.update_domain( domain="example.com", domain_type=DomainType.ALLOW, domain_kind=DomainKind.EXACT, new_type=DomainType.DENY, new_kind=DomainKind.EXACT ) """ # URL encode the domain for safe transmission encoded_domain = quote(domain, safe="") url_path = ( f"{self.BASE_URL}/{domain_type.value}/{domain_kind.value}/{encoded_domain}" ) request_data = DomainRequest( domain=None, type=new_type, kind=new_kind, comment=comment, groups=groups, enabled=enabled, ) # Convert to dict and remove None values payload = {k: v for k, v in request_data.model_dump().items() if v is not None} response = make_pihole_request(self._client, "PUT", url_path, json=payload) data = response.json() return DomainMutationResponse(**data)
[docs] def delete_domain( self, domain: str, domain_type: DomainType, domain_kind: DomainKind ) -> None: r"""Delete a domain. Args: domain: The domain name to delete. domain_type: Type of domain (allow/deny). domain_kind: Kind of domain (exact/regex). Raises: PiHoleAPIError: API request failed or domain not found. PiHoleConnectionError: Connection failed. PiHoleServerError: Server error. Examples:: # Delete an exact blocked domain domains.delete_domain( "badsite.com", DomainType.DENY, DomainKind.EXACT ) # Delete a regex pattern domains.delete_domain( r".*\.ads\..*", DomainType.DENY, DomainKind.REGEX ) """ # URL encode the domain for safe transmission encoded_domain = quote(domain, safe="") url_path = ( f"/api/domains/{domain_type.value}/{domain_kind.value}/{encoded_domain}" ) make_pihole_request(self._client, "DELETE", url_path)
[docs] def batch_delete_domains(self, domains: list[DomainBatchDeleteItem]) -> bool: r"""Delete multiple domains in a single request. Args: domains: List of DomainBatchDeleteItem objects specifying domains to delete. Returns: True if successful. Raises: PiHoleAPIError: API request failed. PiHoleConnectionError: Connection failed. PiHoleServerError: Server error. Examples:: # Delete multiple domains success = domains.batch_delete_domains([ DomainBatchDeleteItem( item="site1.com", type=DomainType.DENY, kind=DomainKind.EXACT ), DomainBatchDeleteItem( item="site2.com", type=DomainType.DENY, kind=DomainKind.EXACT ), DomainBatchDeleteItem( item=r".*\.ads\..*", type=DomainType.DENY, kind=DomainKind.REGEX ) ]) print(f"Batch delete successful: {success}") """ url_path = f"{self.BASE_URL}:batchDelete" # Convert to list of dicts payload = [domain.model_dump() for domain in domains] response = make_pihole_request(self._client, "POST", url_path, json=payload) if response.status_code == 204: # No Content - success return True raise PiHoleAPIError( f"{response.json().get('error', {}).get('message', 'Unknown error')}" )