Transfer zones from alternative providers to STACKIT DNS
This section presents a series of instructional guides detailing the process of transitioning zones and associated resource records from alternative service providers to the STACKIT DNS environment.
The procedure to transfer zones from AWS Route53 to STACKIT DNS is as follows: Initially, it is necessary to establish specific environment variables, followed by the execution of a Python script.
export STACKIT_PROJECT_ID="0ef237fb-00dd-4cec-a5a4-5855a2dde78b"export STACKIT_BEARER_TOKEN="ey.."Additionally, it is imperative to configure credentials within the ~/.aws/config file.
[profile migration]aws_access_key_id = AK...aws_secret_access_key = passwordregion = eu-north-1Subsequently, you can initiate the script:
pip install requestspip install boto3python migration.pyCommence by creating a file named migration.py and populate it with the subsequent content:
import osimport refrom dataclasses import dataclassfrom time import sleep, timefrom typing import Optional, List
# pip install requestsimport requests# pip install boto3import boto3
stackit_bearer_token = os.getenv("STACKIT_BEARER_TOKEN", "ey...")stackit_dns_api_url = os.getenv("STACKIT_DNS_API_URL", "https://dns.api.stackit.cloud")stackit_project_id = os.getenv( "STACKIT_PROJECT_ID", "0ef237fb-00dd-4cec-a5a4-5855a2dde78b") # shown in the url of portal.stackit.cloud
@dataclassclass Record: content: str
@dataclassclass RecordSet: name: str records: List[Record] ttl: int type: str
def __str__(self): return f"RecordSet: (name: {self.name}, type: {self.type})"
@dataclassclass Zone: contact_email: str dns_name: str expire_time: int name: str negative_cache_ttl: int primaries: Optional[List[str]] refresh_time: int retry_time: int type: str # primary or secondary record_sets: List[RecordSet]
def __str__(self): return f"Zone: (name: {self.name}, {self.dns_name}, {self.record_sets})"
class AWSZoneFetcher: def __init__(self): # passing credentials as env variables did not work for me. Therefore, I am using a profile # keep in mind to set something like this in your ~/.aws/config file: # [profile migration] # aws_access_key_id = access_key # aws_secret_access_key = secret_key session = boto3.Session(profile_name=os.getenv("AWS_PROFILE", "migration")) self.route53_client = session.client("route53")
def fetch_zones(self) -> List[Zone]: zones = [] hosted_zones = self.route53_client.list_hosted_zones()["HostedZones"]
for hosted_zone in hosted_zones: if hosted_zone["Config"]["PrivateZone"]: print(f"Skipping private zone: {hosted_zone['Name']}") continue zone = self._generate_zone_object(hosted_zone) zones.append(zone)
print(f"Successfully fetched zones: {zones}")
return zones
def _generate_zone_object(self, hosted_zone) -> Zone: soa_data, record_sets = self._fetch_record_sets(hosted_zone) return self._create_zone_object(hosted_zone, soa_data, record_sets)
def _fetch_record_sets(self, hosted_zone) -> Tuple[dict, List[RecordSet]]: paginator = self.route53_client.get_paginator("list_resource_record_sets") response_iterator = paginator.paginate(HostedZoneId=hosted_zone["Id"])
soa_data = {} record_sets = [] for page in response_iterator: for record_set in page["ResourceRecordSets"]: if self._should_ignore_record_set(record_set, hosted_zone): continue if self._is_soa_record(record_set, hosted_zone): soa_data = self._parse_soa_record(record_set) continue
records = [ Record(r["Value"]) for r in record_set.get("ResourceRecords", []) ] record_sets.append( RecordSet( name=record_set["Name"], records=records, ttl=record_set.get("TTL", 0), type=record_set["Type"], ) )
return soa_data, record_sets
def _should_ignore_record_set(self, record_set, hosted_zone) -> bool: return record_set["Type"] == "NS" and record_set["Name"].rstrip( "." ) == hosted_zone["Name"].rstrip(".")
def _is_soa_record(self, record_set, hosted_zone) -> bool: return ( record_set["Type"] == "SOA" and record_set["Name"].rstrip(".") == hosted_zone["Name"].rstrip(".") and "ResourceRecords" in record_set )
def _parse_soa_record(self, record_set) -> dict: soa_record = record_set["ResourceRecords"][0]["Value"].split() return { "contact_email": self._un_reformat_mail(soa_record[1]), "refresh_time": int(soa_record[3]), "retry_time": int(soa_record[4]), "expire_time": int(soa_record[5]), "negative_cache_ttl": int(soa_record[6]), }
def _create_zone_object(self, hosted_zone, soa_data, record_sets) -> Zone: return Zone( contact_email=soa_data.get("contact_email", ""), dns_name=hosted_zone["Name"].rstrip("."), expire_time=soa_data.get("expire_time", 1209600), name=hosted_zone["Name"].rstrip("."), negative_cache_ttl=soa_data.get("negative_cache_ttl", 60), primaries=[], # Route 53 does not explicitly provide a "primaries" attribute refresh_time=soa_data.get("refresh_time", 3600), retry_time=soa_data.get("retry_time", 600), type="primary", record_sets=record_sets, )
def _is_valid_email(self, email) -> bool: pattern = r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$" return bool(re.match(pattern, email))
def _un_reformat_mail(self, rname: str) -> str: if not rname: return rname
# Remove the trailing period rname = rname.rstrip(".")
# Split the string by period split_rname = rname.split(".")
# Check if there are enough parts to replace the second last. There must be at least 3 parts. if len(split_rname) < 3: return rname
# Replace the second last item split_rname[-2] = "@" + split_rname[-2]
# Construct the result result = ".".join(split_rname[:-2]) + split_rname[-2] + "." + split_rname[-1]
return result
class StackitDnsApi: def __init__(self, api_url: str, project_id: str, bearer_token: str): self.api_url = api_url self.project_id = project_id self.headers = {"Authorization": f"Bearer {bearer_token}"}
def _handle_error(self, response) -> None: if response.status_code not in {200, 202}: raise ValueError(f"API Error: {response.text}, {response.status_code}")
def get_zone_by_dns_name(self, dns_name: str) -> List[dict]: response = requests.get( f"{self.api_url}/v1/projects/{self.project_id}/zones?dnsName[eq]={dns_name}&active[eq]=true", headers=self.headers, ) self._handle_error(response) return response.json()["zones"]
def create_zone(self, payload: dict) -> dict: response = requests.post( f"{self.api_url}/v1/projects/{self.project_id}/zones", headers=self.headers, json=payload, ) self._handle_error(response)
response_zone = response.json()["zone"]
print(f"successfully created zone: {response_zone['dnsName']}")
return response_zone
def get_zone_by_id(self, zone_id: str) -> dict: response = requests.get( f"{self.api_url}/v1/projects/{self.project_id}/zones/{zone_id}", headers=self.headers, ) self._handle_error(response) return response.json()
def create_resource_record_set(self, zone_id: str, payload: dict) -> None: response = requests.post( f"{self.api_url}/v1/projects/{self.project_id}/zones/{zone_id}/rrsets", headers=self.headers, json=payload, ) self._handle_error(response) print(f"successfully created record: {payload['name']}")
class ZoneManager: def __init__(self, api: StackitDnsApi): self.api = api
def create_zone(self, zone: Zone) -> str: zones = self.api.get_zone_by_dns_name(zone.dns_name) if len(zones) > 0: return zones[0]["id"]
payload = { "dnsName": zone.dns_name, "expireTime": zone.expire_time, "name": zone.name, "negativeCache": zone.negative_cache_ttl, "primaries": zone.primaries, "refreshTime": zone.refresh_time, "retryTime": zone.retry_time, "type": zone.type, } if len(zone.contact_email) > 0: payload.update({"contactEmail": zone.contact_email})
response_zone = self.api.create_zone(payload) zone_id = response_zone["id"]
# Check zone creation status success = self._check_zone_creation_status(zone_id) if not success: print("Zone creation did not succeed within the timeout.") raise ValueError("Zone creation did not succeed within the timeout.")
print(f"Zone in creation succeeded state: {zone.dns_name}")
return zone_id
def _check_zone_creation_status(self, zone_id: str) -> bool: timeout = 30 start_time = time()
while time() - start_time < timeout: zone_data = self.api.get_zone_by_id(zone_id) state = zone_data["zone"]["state"]
if state == "CREATE_SUCCEEDED": return True else: print(f"Zone creation state: {state}. Retrying in 1 second...") sleep(1) # Wait for 1 second before retrying
return False
class RecordManager: def __init__(self, api: StackitDnsApi): self.api = api
def create_record_sets(self, zone: Zone, zone_id: str) -> None: for record_set in zone.record_sets: # skip soa and ns records that are the same as the zone dns name if ( record_set.type in {"NS", "SOA"} and record_set.name == f"{zone.dns_name}." ): continue self.create_record_set(record_set, zone_id)
def create_record_set(self, record_set: RecordSet, zone_id: str) -> None: payload = { "name": record_set.name, "records": [ {"content": record_data.content} for record_data in record_set.records ], "ttl": record_set.ttl, "type": record_set.type, } self.api.create_resource_record_set(zone_id, payload)
if __name__ == "__main__": zones = AWSZoneFetcher().fetch_zones()
api = StackitDnsApi(stackit_dns_api_url, stackit_project_id, stackit_bearer_token) zone_manager = ZoneManager(api) record_manager = RecordManager(api)
for zone in zones: print(f"Attempt to migrate zone: {zone.dns_name}") try: zone_id = zone_manager.create_zone(zone) record_manager.create_record_sets(zone, zone_id) except Exception as e: print(f"Error migrating zone: {zone.dns_name}, {e}")The procedure to transfer zones from Azure to STACKIT DNS is as follows: Initially, it is necessary to establish specific environment variables, followed by the execution of a Python script.
export STACKIT_PROJECT_ID="<project-id>"export STACKIT_BEARER_TOKEN="ey.."export AZURE_SUBSCRIPTION_ID="<subscription-id>"az loginSubsequently, you can initiate the script:
pip install requestspip install azure-identitypip install azure-mgmt-dnspython migration.pyCommence by creating a file named migration.py and populate it with the subsequent content:
import osimport refrom dataclasses import dataclassfrom time import sleep, timefrom typing import Optional, List
# pip install requestsimport requests
# install lib: pip install azure-identityfrom azure.identity import DefaultAzureCredential
# install lib: pip install azure-mgmt-dnsfrom azure.mgmt.dns import DnsManagementClient
stackit_bearer_token = os.getenv("STACKIT_BEARER_TOKEN", "ey...")stackit_dns_api_url = os.getenv("STACKIT_DNS_API_URL", "https://dns.api.stackit.cloud")stackit_project_id = os.getenv( "STACKIT_PROJECT_ID", "0ef237fb-00dd-4cec-a5a4-5855a2dde78b") # shown in the url of portal.stackit.cloud
# subscription id where the zone(s) are locatedSUBSCRIPTION_ID = os.getenv( "AZURE_SUBSCRIPTION_ID", "08c8c34f-7a3c-4b25-8cf9-a9f7d38017a1")
# authenticate with: az logincredential = DefaultAzureCredential()
dns_client = DnsManagementClient(credential, SUBSCRIPTION_ID)
@dataclassclass Record: content: str
@dataclassclass RecordSet: name: str records: List[Record] ttl: int type: str
def __str__(self): return f"RecordSet: (name: {self.name}, type: {self.type})"
@dataclassclass Zone: contact_email: str dns_name: str expire_time: int name: str negative_cache_ttl: int primaries: Optional[List[str]] refresh_time: int retry_time: int type: str # primary or secondary record_sets: List[RecordSet]
def __str__(self): return f"Zone: (name: {self.name}, {self.dns_name}, {self.record_sets})"
class AzureZoneFetcher: def __init__(self, dns_client: DnsManagementClient): self.dns_client = dns_client
def fetch_zones(self) -> List[Zone]: zones = [] dns_zones = self.dns_client.zones.list() for zone in dns_zones: # ignore private zones if zone.zone_type == "Private": print(f"Skipping private zone: {zone.name}") continue zones.append(self._generate_zone_object(zone))
print(f"successfully fetched zones: {zones}")
return zones
def _generate_zone_object(self, zone) -> Zone: resource_group_name = self._extract_resource_group_name(zone) record_sets = self.dns_client.record_sets.list_all_by_dns_zone( resource_group_name, zone.name )
zone_data = self._extract_soa_record(zone, record_sets) record_sets_list = self._extract_record_sets(zone, record_sets)
contact_email = zone_data.get("contact_email", "") if not self._is_valid_email(contact_email): contact_email = self._un_reformat_mail(contact_email) if not self._is_valid_email(contact_email): contact_email = ""
return Zone( contact_email=contact_email, dns_name=zone.name, expire_time=zone_data.get("expire_time", 1209600), name=zone.name, negative_cache_ttl=zone_data.get("negative_cache_ttl", 60), primaries=[], refresh_time=zone_data.get("refresh_time", 3600), retry_time=zone_data.get("retry_time", 600), type="primary", record_sets=record_sets_list, )
def _extract_resource_group_name(self, zone) -> str: return zone.id.split("/")[4]
def _extract_soa_record(self, zone, record_sets) -> dict: for record_set in record_sets: if record_set.soa_record and record_set.fqdn == zone.name + ".": soa = record_set.soa_record return { "contact_email": soa.email, "expire_time": soa.expire_time, "negative_cache_ttl": soa.minimum_ttl, "refresh_time": soa.refresh_time, "retry_time": soa.retry_time, }
return {}
def _extract_record_sets(self, zone, record_sets) -> List[RecordSet]: record_sets_list = [] for record_set in record_sets: # skip soa and ns records that are the same as the zone dns name if record_set.fqdn == zone.name + "." and ( record_set.soa_record or record_set.ns_records ): continue records = self._extract_records(record_set) record_sets_list.append( RecordSet( name=record_set.fqdn, records=records, ttl=record_set.ttl, type=record_set.type.split("/")[-1], ) )
return record_sets_list
def _extract_records(self, record_set) -> List[Record]: records = [] for record_type in [ "a_records", "aaaa_records", "mx_records", "ns_records", "ptr_records", "srv_records", "txt_records", "cname_record", "caa_records", ]: record_data = getattr(record_set, record_type, None) if record_data: if not isinstance(record_data, list): record_data = [record_data] for rec in record_data: record_content = self._get_record_content(rec) if record_content: records.append(Record(content=record_content)) return records
def _get_record_content(self, record) -> Optional[str]: if hasattr(record, "ipv4_address"): return record.ipv4_address elif hasattr(record, "ipv6_address"): return record.ipv6_address elif hasattr(record, "preference") and hasattr(record, "exchange"): return f"{record.preference} {record.exchange}" elif hasattr(record, "nsdname"): return record.nsdname elif hasattr(record, "ptrdname"): return record.ptrdname elif ( hasattr(record, "priority") and hasattr(record, "weight") and hasattr(record, "port") and hasattr(record, "target") ): return f"{record.priority} {record.weight} {record.port} {record.target}" elif hasattr(record, "value"): return " ".join(record.value) elif hasattr(record, "cname"): return record.cname elif ( hasattr(record, "flags") and hasattr(record, "tag") and hasattr(record, "value") ): return f"Flags={record.flags} Tag={record.tag} Value={record.value}" else: return None
def _is_valid_email(self, email): pattern = r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$" return bool(re.match(pattern, email))
def _un_reformat_mail(self, rname: str) -> str: if not rname: return rname
# Remove the trailing period rname = rname.rstrip(".")
# Split the string by period split_rname = rname.split(".")
# Check if there are enough parts to replace the second last. There must be at least 3 parts. if len(split_rname) < 3: return rname
# Replace the second last item split_rname[-2] = "@" + split_rname[-2]
# Construct the result result = ".".join(split_rname[:-2]) + split_rname[-2] + "." + split_rname[-1]
return result
class StackitDnsApi: def __init__(self, api_url: str, project_id: str, bearer_token: str): self.api_url = api_url self.project_id = project_id self.headers = {"Authorization": f"Bearer {bearer_token}"}
def _handle_error(self, response) -> None: if response.status_code not in {200, 202}: raise ValueError(f"API Error: {response.text}, {response.status_code}")
def get_zone_by_dns_name(self, dns_name: str) -> List[dict]: response = requests.get( f"{self.api_url}/v1/projects/{self.project_id}/zones?dnsName[eq]={dns_name}&active[eq]=true", headers=self.headers, ) self._handle_error(response) return response.json()["zones"]
def create_zone(self, payload: dict) -> dict: response = requests.post( f"{self.api_url}/v1/projects/{self.project_id}/zones", headers=self.headers, json=payload, ) self._handle_error(response)
response_zone = response.json()["zone"]
print(f"successfully created zone: {response_zone['dnsName']}")
return response_zone
def get_zone_by_id(self, zone_id: str) -> dict: response = requests.get( f"{self.api_url}/v1/projects/{self.project_id}/zones/{zone_id}", headers=self.headers, ) self._handle_error(response) return response.json()
def create_resource_record_set(self, zone_id: str, payload: dict) -> None: response = requests.post( f"{self.api_url}/v1/projects/{self.project_id}/zones/{zone_id}/rrsets", headers=self.headers, json=payload, ) self._handle_error(response) print(f"successfully created record: {payload['name']}")
class ZoneManager: def __init__(self, api: StackitDnsApi): self.api = api
def create_zone(self, zone: Zone) -> str: zones = self.api.get_zone_by_dns_name(zone.dns_name) if len(zones) > 0: return zones[0]["id"]
payload = { "dnsName": zone.dns_name, "expireTime": zone.expire_time, "name": zone.name, "negativeCache": zone.negative_cache_ttl, "primaries": zone.primaries, "refreshTime": zone.refresh_time, "retryTime": zone.retry_time, "type": zone.type, } if len(zone.contact_email) > 0: payload.update({"contactEmail": zone.contact_email})
response_zone = self.api.create_zone(payload) zone_id = response_zone["id"]
# Check zone creation status success = self._check_zone_creation_status(zone_id) if not success: print("Zone creation did not succeed within the timeout.") raise ValueError("Zone creation did not succeed within the timeout.")
print(f"Zone in creation succeeded state: {zone.dns_name}")
return zone_id
def _check_zone_creation_status(self, zone_id: str) -> bool: timeout = 30 start_time = time()
while time() - start_time < timeout: zone_data = self.api.get_zone_by_id(zone_id) state = zone_data["zone"]["state"]
if state == "CREATE_SUCCEEDED": return True else: print(f"Zone creation state: {state}. Retrying in 1 second...") sleep(1) # Wait for 1 second before retrying
return False
class RecordManager: def __init__(self, api: StackitDnsApi): self.api = api
def create_record_sets(self, zone: Zone, zone_id: str) -> None: for record_set in zone.record_sets: # skip soa and ns records that are the same as the zone dns name if ( record_set.type in {"NS", "SOA"} and record_set.name == f"{zone.dns_name}." ): continue self.create_record_set(record_set, zone_id)
def create_record_set(self, record_set: RecordSet, zone_id: str) -> None: payload = { "name": record_set.name, "records": [ {"content": record_data.content} for record_data in record_set.records ], "ttl": record_set.ttl, "type": record_set.type, } self.api.create_resource_record_set(zone_id, payload)
if __name__ == "__main__": zone_fetcher = AzureZoneFetcher(dns_client) zones = zone_fetcher.fetch_zones()
api = StackitDnsApi(stackit_dns_api_url, stackit_project_id, stackit_bearer_token) zone_manager = ZoneManager(api) record_manager = RecordManager(api)
for zone in zones: print(f"Attempt to migrate zone: {zone.dns_name}") try: zone_id = zone_manager.create_zone(zone) record_manager.create_record_sets(zone, zone_id) except Exception as e: print(f"Error migrating zone: {zone.dns_name}, {e}")The procedure to transfer zones from Google DNS to STACKIT DNS is as follows: Initially, it is necessary to create the zone within STACKIT DNS and establish specific environment variables, followed by the execution of a Python script.
export STACKIT_PROJECT_ID="<project-id>"export STACKIT_BEARER_TOKEN="ey.."export GCLOUD_PROJECT_ID="<google-project-id>"gcloud auth application-default loginSubsequently, you can initiate the script:
pip install requestspip install --upgrade google-cloud-dnspython migration.pyCommence by creating a file named migration.py and populate it with the subsequent content:
import osimport refrom dataclasses import dataclassfrom time import sleep, timefrom typing import Optional, List, Tuple
# pip install requestsimport requests# pip install --upgrade google-cloud-dnsfrom google.cloud import dns
stackit_bearer_token = os.getenv("STACKIT_BEARER_TOKEN", "ey...")stackit_dns_api_url = os.getenv("STACKIT_DNS_API_URL", "https://dns.api.stackit.cloud")stackit_project_id = os.getenv( "STACKIT_PROJECT_ID", "0ef237fb-00dd-4cec-a5a4-5855a2dde78b") # shown in the url of portal.stackit.cloud
# authenticate with: gcloud auth application-default login# install lib: pip install --upgrade google-cloud-dnsgcloud_project_id = os.getenv( "GCLOUD_PROJECT_ID", "") # shown in the url of the web portal
@dataclassclass Record: content: str
@dataclassclass RecordSet: name: str records: List[Record] ttl: int type: str
def __str__(self): return f"RecordSet: (name: {self.name}, type: {self.type})"
@dataclassclass Zone: contact_email: str dns_name: str expire_time: int name: str negative_cache_ttl: int primaries: Optional[List[str]] refresh_time: int retry_time: int type: str # primary or secondary record_sets: List[RecordSet]
def __str__(self): return f"Zone: (name: {self.name}, {self.dns_name}, {self.record_sets})"
class GoogleZoneFetcher: def __init__(self, gcloud_project_id: str): self.client = dns.Client(project=gcloud_project_id)
def fetch_zones(self) -> List[Zone]: zones = [] managed_zones = self.client.list_zones()
for managed_zone in managed_zones: # since there is no visibility in the managed_zone object we assume that the zone is public and should be # migrated zone = self._generate_zone_object(managed_zone) zones.append(zone)
print(f"Successfully fetched zones: {zones}")
return zones
def _generate_zone_object(self, managed_zone) -> Zone: soa_data, record_sets = self._fetch_record_sets(managed_zone) return self._create_zone_object(managed_zone, soa_data, record_sets)
def _fetch_record_sets(self, managed_zone) -> Tuple[dict, List[RecordSet]]: record_sets = [] soa_data = {} for record_set in managed_zone.list_resource_record_sets(): if record_set.record_type == "SOA": soa_data = self._parse_soa_record(record_set) continue if record_set.record_type == "NS" and record_set.name.rstrip( "." ) == managed_zone.dns_name.rstrip("."): continue
records = [Record(rrdata) for rrdata in record_set.rrdatas] record_sets.append( RecordSet( name=record_set.name, records=records, ttl=record_set.ttl, type=record_set.record_type, ) ) return soa_data, record_sets
def _parse_soa_record(self, record_set) -> dict: soa_record = record_set.rrdatas[0].split() return { "contact_email": self._un_reformat_mail(soa_record[1]), "refresh_time": int(soa_record[3]), "retry_time": int(soa_record[4]), "expire_time": int(soa_record[5]), "negative_cache_ttl": int(soa_record[6]), }
def _create_zone_object(self, managed_zone, soa_data, record_sets) -> Zone: return Zone( contact_email=soa_data.get("contact_email", ""), dns_name=managed_zone.dns_name.rstrip("."), expire_time=soa_data.get("expire_time", 1209600), name=managed_zone.name, negative_cache_ttl=soa_data.get("negative_cache_ttl", 60), primaries=[], # Google Cloud DNS does not provide a "primaries" attribute refresh_time=soa_data.get("refresh_time", 3600), retry_time=soa_data.get("retry_time", 600), type="primary", record_sets=record_sets, )
def _is_valid_email(self, email) -> bool: pattern = r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$" return bool(re.match(pattern, email))
def _un_reformat_mail(self, rname: str) -> str: if not rname: return rname
# Remove the trailing period rname = rname.rstrip(".")
# Split the string by period split_rname = rname.split(".")
# Check if there are enough parts to replace the second last. There must be at least 3 parts. if len(split_rname) < 3: return rname
# Replace the second last item split_rname[-2] = "@" + split_rname[-2]
# Construct the result result = ".".join(split_rname[:-2]) + split_rname[-2] + "." + split_rname[-1]
return result
class StackitDnsApi: def __init__(self, api_url: str, project_id: str, bearer_token: str): self.api_url = api_url self.project_id = project_id self.headers = {"Authorization": f"Bearer {bearer_token}"}
def _handle_error(self, response) -> None: if response.status_code not in {200, 202}: raise ValueError(f"API Error: {response.text}, {response.status_code}")
def get_zone_by_dns_name(self, dns_name: str) -> List[dict]: response = requests.get( f"{self.api_url}/v1/projects/{self.project_id}/zones?dnsName[eq]={dns_name}&active[eq]=true", headers=self.headers, ) self._handle_error(response) return response.json()["zones"]
def create_zone(self, payload: dict) -> dict: response = requests.post( f"{self.api_url}/v1/projects/{self.project_id}/zones", headers=self.headers, json=payload, ) self._handle_error(response)
response_zone = response.json()["zone"]
print(f"successfully created zone: {response_zone['dnsName']}")
return response_zone
def get_zone_by_id(self, zone_id: str) -> dict: response = requests.get( f"{self.api_url}/v1/projects/{self.project_id}/zones/{zone_id}", headers=self.headers, ) self._handle_error(response) return response.json()
def create_resource_record_set(self, zone_id: str, payload: dict) -> None: response = requests.post( f"{self.api_url}/v1/projects/{self.project_id}/zones/{zone_id}/rrsets", headers=self.headers, json=payload, ) self._handle_error(response) print(f"successfully created record: {payload['name']}")
class ZoneManager: def __init__(self, api: StackitDnsApi): self.api = api
def create_zone(self, zone: Zone) -> str: zones = self.api.get_zone_by_dns_name(zone.dns_name) if len(zones) > 0: return zones[0]["id"]
payload = { "dnsName": zone.dns_name, "expireTime": zone.expire_time, "name": zone.name, "negativeCache": zone.negative_cache_ttl, "primaries": zone.primaries, "refreshTime": zone.refresh_time, "retryTime": zone.retry_time, "type": zone.type, } if len(zone.contact_email) > 0: payload.update({"contactEmail": zone.contact_email})
response_zone = self.api.create_zone(payload) zone_id = response_zone["id"]
# Check zone creation status success = self._check_zone_creation_status(zone_id) if not success: print("Zone creation did not succeed within the timeout.") raise ValueError("Zone creation did not succeed within the timeout.")
print(f"Zone in creation succeeded state: {zone.dns_name}")
return zone_id
def _check_zone_creation_status(self, zone_id: str) -> bool: timeout = 30 start_time = time()
while time() - start_time < timeout: zone_data = self.api.get_zone_by_id(zone_id) state = zone_data["zone"]["state"]
if state == "CREATE_SUCCEEDED": return True else: print(f"Zone creation state: {state}. Retrying in 1 second...") sleep(1) # Wait for 1 second before retrying
return False
class RecordManager: def __init__(self, api: StackitDnsApi): self.api = api
def create_record_sets(self, zone: Zone, zone_id: str) -> None: for record_set in zone.record_sets: # skip soa and ns records that are the same as the zone dns name if ( record_set.type in {"NS", "SOA"} and record_set.name == f"{zone.dns_name}." ): continue self.create_record_set(record_set, zone_id)
def create_record_set(self, record_set: RecordSet, zone_id: str) -> None: payload = { "name": record_set.name, "records": [ {"content": record_data.content} for record_data in record_set.records ], "ttl": record_set.ttl, "type": record_set.type, } self.api.create_resource_record_set(zone_id, payload)
if __name__ == "__main__": zones = GoogleZoneFetcher(gcloud_project_id).fetch_zones()
api = StackitDnsApi(stackit_dns_api_url, stackit_project_id, stackit_bearer_token) zone_manager = ZoneManager(api) record_manager = RecordManager(api)
for zone in zones: print(f"Attempt to migrate zone: {zone.dns_name}") try: zone_id = zone_manager.create_zone(zone) record_manager.create_record_sets(zone, zone_id) except Exception as e: print(f"Error migrating zone: {zone.dns_name}, {e}")