Skip to content

Transfer zones from alternative providers to STACKIT DNS

This section presents a series of instructional guides detailing the process of transitioning zones and associated resource records from alternative service providers to the STACKIT DNS environment.

The procedure to transfer zones from AWS Route53 to STACKIT DNS is as follows: Initially, it is necessary to establish specific environment variables, followed by the execution of a Python script.

Terminal window
export STACKIT_PROJECT_ID="0ef237fb-00dd-4cec-a5a4-5855a2dde78b"
export STACKIT_BEARER_TOKEN="ey.."

Additionally, it is imperative to configure credentials within the ~/.aws/config file.

[profile migration]
aws_access_key_id = AK...
aws_secret_access_key = password
region = eu-north-1

Subsequently, you can initiate the script:

Terminal window
pip install requests
pip install boto3
python migration.py

Commence by creating a file named migration.py and populate it with the subsequent content:

import os
import re
from dataclasses import dataclass
from time import sleep, time
from typing import Optional, List
# pip install requests
import requests
# pip install boto3
import boto3
stackit_bearer_token = os.getenv("STACKIT_BEARER_TOKEN", "ey...")
stackit_dns_api_url = os.getenv("STACKIT_DNS_API_URL", "https://dns.api.stackit.cloud")
stackit_project_id = os.getenv(
"STACKIT_PROJECT_ID", "0ef237fb-00dd-4cec-a5a4-5855a2dde78b"
) # shown in the url of portal.stackit.cloud
@dataclass
class Record:
content: str
@dataclass
class RecordSet:
name: str
records: List[Record]
ttl: int
type: str
def __str__(self):
return f"RecordSet: (name: {self.name}, type: {self.type})"
@dataclass
class Zone:
contact_email: str
dns_name: str
expire_time: int
name: str
negative_cache_ttl: int
primaries: Optional[List[str]]
refresh_time: int
retry_time: int
type: str # primary or secondary
record_sets: List[RecordSet]
def __str__(self):
return f"Zone: (name: {self.name}, {self.dns_name}, {self.record_sets})"
class AWSZoneFetcher:
def __init__(self):
# passing credentials as env variables did not work for me. Therefore, I am using a profile
# keep in mind to set something like this in your ~/.aws/config file:
# [profile migration]
# aws_access_key_id = access_key
# aws_secret_access_key = secret_key
session = boto3.Session(profile_name=os.getenv("AWS_PROFILE", "migration"))
self.route53_client = session.client("route53")
def fetch_zones(self) -> List[Zone]:
zones = []
hosted_zones = self.route53_client.list_hosted_zones()["HostedZones"]
for hosted_zone in hosted_zones:
if hosted_zone["Config"]["PrivateZone"]:
print(f"Skipping private zone: {hosted_zone['Name']}")
continue
zone = self._generate_zone_object(hosted_zone)
zones.append(zone)
print(f"Successfully fetched zones: {zones}")
return zones
def _generate_zone_object(self, hosted_zone) -> Zone:
soa_data, record_sets = self._fetch_record_sets(hosted_zone)
return self._create_zone_object(hosted_zone, soa_data, record_sets)
def _fetch_record_sets(self, hosted_zone) -> Tuple[dict, List[RecordSet]]:
paginator = self.route53_client.get_paginator("list_resource_record_sets")
response_iterator = paginator.paginate(HostedZoneId=hosted_zone["Id"])
soa_data = {}
record_sets = []
for page in response_iterator:
for record_set in page["ResourceRecordSets"]:
if self._should_ignore_record_set(record_set, hosted_zone):
continue
if self._is_soa_record(record_set, hosted_zone):
soa_data = self._parse_soa_record(record_set)
continue
records = [
Record(r["Value"]) for r in record_set.get("ResourceRecords", [])
]
record_sets.append(
RecordSet(
name=record_set["Name"],
records=records,
ttl=record_set.get("TTL", 0),
type=record_set["Type"],
)
)
return soa_data, record_sets
def _should_ignore_record_set(self, record_set, hosted_zone) -> bool:
return record_set["Type"] == "NS" and record_set["Name"].rstrip(
"."
) == hosted_zone["Name"].rstrip(".")
def _is_soa_record(self, record_set, hosted_zone) -> bool:
return (
record_set["Type"] == "SOA"
and record_set["Name"].rstrip(".") == hosted_zone["Name"].rstrip(".")
and "ResourceRecords" in record_set
)
def _parse_soa_record(self, record_set) -> dict:
soa_record = record_set["ResourceRecords"][0]["Value"].split()
return {
"contact_email": self._un_reformat_mail(soa_record[1]),
"refresh_time": int(soa_record[3]),
"retry_time": int(soa_record[4]),
"expire_time": int(soa_record[5]),
"negative_cache_ttl": int(soa_record[6]),
}
def _create_zone_object(self, hosted_zone, soa_data, record_sets) -> Zone:
return Zone(
contact_email=soa_data.get("contact_email", ""),
dns_name=hosted_zone["Name"].rstrip("."),
expire_time=soa_data.get("expire_time", 1209600),
name=hosted_zone["Name"].rstrip("."),
negative_cache_ttl=soa_data.get("negative_cache_ttl", 60),
primaries=[], # Route 53 does not explicitly provide a "primaries" attribute
refresh_time=soa_data.get("refresh_time", 3600),
retry_time=soa_data.get("retry_time", 600),
type="primary",
record_sets=record_sets,
)
def _is_valid_email(self, email) -> bool:
pattern = r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$"
return bool(re.match(pattern, email))
def _un_reformat_mail(self, rname: str) -> str:
if not rname:
return rname
# Remove the trailing period
rname = rname.rstrip(".")
# Split the string by period
split_rname = rname.split(".")
# Check if there are enough parts to replace the second last. There must be at least 3 parts.
if len(split_rname) < 3:
return rname
# Replace the second last item
split_rname[-2] = "@" + split_rname[-2]
# Construct the result
result = ".".join(split_rname[:-2]) + split_rname[-2] + "." + split_rname[-1]
return result
class StackitDnsApi:
def __init__(self, api_url: str, project_id: str, bearer_token: str):
self.api_url = api_url
self.project_id = project_id
self.headers = {"Authorization": f"Bearer {bearer_token}"}
def _handle_error(self, response) -> None:
if response.status_code not in {200, 202}:
raise ValueError(f"API Error: {response.text}, {response.status_code}")
def get_zone_by_dns_name(self, dns_name: str) -> List[dict]:
response = requests.get(
f"{self.api_url}/v1/projects/{self.project_id}/zones?dnsName[eq]={dns_name}&active[eq]=true",
headers=self.headers,
)
self._handle_error(response)
return response.json()["zones"]
def create_zone(self, payload: dict) -> dict:
response = requests.post(
f"{self.api_url}/v1/projects/{self.project_id}/zones",
headers=self.headers,
json=payload,
)
self._handle_error(response)
response_zone = response.json()["zone"]
print(f"successfully created zone: {response_zone['dnsName']}")
return response_zone
def get_zone_by_id(self, zone_id: str) -> dict:
response = requests.get(
f"{self.api_url}/v1/projects/{self.project_id}/zones/{zone_id}",
headers=self.headers,
)
self._handle_error(response)
return response.json()
def create_resource_record_set(self, zone_id: str, payload: dict) -> None:
response = requests.post(
f"{self.api_url}/v1/projects/{self.project_id}/zones/{zone_id}/rrsets",
headers=self.headers,
json=payload,
)
self._handle_error(response)
print(f"successfully created record: {payload['name']}")
class ZoneManager:
def __init__(self, api: StackitDnsApi):
self.api = api
def create_zone(self, zone: Zone) -> str:
zones = self.api.get_zone_by_dns_name(zone.dns_name)
if len(zones) > 0:
return zones[0]["id"]
payload = {
"dnsName": zone.dns_name,
"expireTime": zone.expire_time,
"name": zone.name,
"negativeCache": zone.negative_cache_ttl,
"primaries": zone.primaries,
"refreshTime": zone.refresh_time,
"retryTime": zone.retry_time,
"type": zone.type,
}
if len(zone.contact_email) > 0:
payload.update({"contactEmail": zone.contact_email})
response_zone = self.api.create_zone(payload)
zone_id = response_zone["id"]
# Check zone creation status
success = self._check_zone_creation_status(zone_id)
if not success:
print("Zone creation did not succeed within the timeout.")
raise ValueError("Zone creation did not succeed within the timeout.")
print(f"Zone in creation succeeded state: {zone.dns_name}")
return zone_id
def _check_zone_creation_status(self, zone_id: str) -> bool:
timeout = 30
start_time = time()
while time() - start_time < timeout:
zone_data = self.api.get_zone_by_id(zone_id)
state = zone_data["zone"]["state"]
if state == "CREATE_SUCCEEDED":
return True
else:
print(f"Zone creation state: {state}. Retrying in 1 second...")
sleep(1) # Wait for 1 second before retrying
return False
class RecordManager:
def __init__(self, api: StackitDnsApi):
self.api = api
def create_record_sets(self, zone: Zone, zone_id: str) -> None:
for record_set in zone.record_sets:
# skip soa and ns records that are the same as the zone dns name
if (
record_set.type in {"NS", "SOA"}
and record_set.name == f"{zone.dns_name}."
):
continue
self.create_record_set(record_set, zone_id)
def create_record_set(self, record_set: RecordSet, zone_id: str) -> None:
payload = {
"name": record_set.name,
"records": [
{"content": record_data.content} for record_data in record_set.records
],
"ttl": record_set.ttl,
"type": record_set.type,
}
self.api.create_resource_record_set(zone_id, payload)
if __name__ == "__main__":
zones = AWSZoneFetcher().fetch_zones()
api = StackitDnsApi(stackit_dns_api_url, stackit_project_id, stackit_bearer_token)
zone_manager = ZoneManager(api)
record_manager = RecordManager(api)
for zone in zones:
print(f"Attempt to migrate zone: {zone.dns_name}")
try:
zone_id = zone_manager.create_zone(zone)
record_manager.create_record_sets(zone, zone_id)
except Exception as e:
print(f"Error migrating zone: {zone.dns_name}, {e}")