阿里云api实现ipv6的ddns

创建python的虚拟环境

# 创建一个虚拟环境
python3 -m venv ipv6_py_env

# 激活虚拟环境
source ipv6_py_env/bin/activate

获取ipv6地址的脚本

ifconfig | grep -A 10 wlp2s0 | grep 'inet6' | awk '{print $2}' | sed '/fe80/ d'

将上述脚本使用python封装

res = sp.run("ifconfig | grep -A 10 wlp2s0 | grep 'inet6' | awk '{print $2}' | sed '/fe80/ d'", shell=True, capture_output=True, encoding="utf8")
ipv6_addrs = res.stdout.split() # ipv6地址可能有多个,随便用一个即可

完整的python代码(修改自参考中的示例代码)

# -*- coding: utf-8 -*-
import os
import sys
import time
import subprocess
from loguru import logger as log
log.add("auto_refresh.log")

from typing import List

from alibabacloud_alidns20150109.client import Client as Alidns20150109Client
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_alidns20150109 import models as alidns_20150109_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_tea_util.client import Client as UtilClient


class Refresh:
def __init__(self):
pass

@staticmethod
def create_client(
access_key_id: str,
access_key_secret: str,
) -> Alidns20150109Client:
"""
使用AK&SK初始化账号Client
@param access_key_id:
@param access_key_secret:
@return: Client
@throws Exception
"""
config = open_api_models.Config(
access_key_id=access_key_id,
access_key_secret=access_key_secret
)
# Endpoint 请参考 https://api.aliyun.com/product/Alidns
config.endpoint = f'alidns.cn-hangzhou.aliyuncs.com'
return Alidns20150109Client(config)

@staticmethod
def update_dns(
new_ip: str
) -> bool:
# 请确保代码运行环境设置了环境变量 ALIBABA_CLOUD_ACCESS_KEY_ID 和 ALIBABA_CLOUD_ACCESS_KEY_SECRET。
# 工程代码泄露可能会导致 AccessKey 泄露,并威胁账号下所有资源的安全性。以下代码示例使用环境变量获取 AccessKey 的方式进行调用,仅供参考,建议使用更安全的 STS 方式,更多鉴权访问方式请参见:https://help.aliyun.com/document_detail/378659.html
client = Refresh.create_client("your-ALIBABA_CLOUD_ACCESS_KEY_ID", "your-ALIBABA_CLOUD_ACCESS_KEY_SECRET")
# record_id 可以通过 DescribeDomainRecords 接口拿到, 见 https://next.api.aliyun.com/api/Alidns/2015-01-09/DescribeDomainRecords
update_domain_record_request = alidns_20150109_models.UpdateDomainRecordRequest(
record_id='879766660467119104',
rr='ipv6',
type='AAAA',
value=new_ip
)
runtime = util_models.RuntimeOptions()
try:
# 复制代码运行请自行打印 API 的返回值
client.update_domain_record_with_options(update_domain_record_request, runtime)
return True
except Exception as error:
if "The DNS record already exists" in error.message:
# 说明该条记录已经存在,则无需更新
return True
log.error(error.message)
# 诊断地址
log.error(error.data.get("Recommend"))
UtilClient.assert_as_string(error.message)
return False
return False

if __name__ == '__main__':
# 检测当前ip是否和上次更新的ip一直,如果不一致则更新dns
pre_ip, cur_ip = None, None
while True:
try:
res = subprocess.run("ifconfig | grep -A 10 wlp2s0 | grep 'inet6' | awk '{print $2}' | sed '/fe80/ d'", shell=True, capture_output=True, encoding="utf8")
if not (res and res.stdout and res.stdout.split()):
raise Exception(f"empty ipv6 address, {res and res.stdout}")
except Exception as e:
log.error(f"get ip error, {e}")
time.sleep(30)
continue

ipv6_addrs = res.stdout.split()
if pre_ip in ipv6_addrs:
time.sleep(1)
continue
else:
cur_ip = ipv6_addrs[-1]
updated = Refresh.update_dns(cur_ip)
if updated:
log.info(f"updated dns, pre: {pre_ip}, new: {cur_ip}")
pre_ip = cur_ip
time.sleep(30)
continue
else:
log.error("update dns fail")
time.sleep(30)
continue

最后执行该文件即可

python auto_refresh_dns.py

参考:

https://next.api.aliyun.com/api/Alidns/2015-01-09/UpdateDomainRecord

上面的api需要传入 RecordId,可以通过下面的api获取

https://next.api.aliyun.com/api/Alidns/2015-01-09/DescribeDomainRecords

ALIBABA_CLOUD_ACCESS_KEY_ID 和 ALIBABA_CLOUD_ACCESS_KEY_SECRET 在阿里云控制台可查看

Leave a Comment