
| # -*- coding: utf-8 -*-
import time import datetime
import urllib import urllib.parse import urllib.request from pyquery import PyQuery as pq from typing import List
from alibabacloud_ecs20140526.client import Client as Ecs20140526Client from alibabacloud_tea_openapi import models as open_api_models from alibabacloud_ecs20140526 import models as ecs_20140526_models from alibabacloud_tea_util import models as util_models from alibabacloud_tea_util.client import Client as UtilClient
class AutoModSecPolicy: def __init__(self, region_id, endpoint, access_key_id, access_key_secret): self.region_id = region_id self.endpoint = endpoint self.access_key_id = access_key_id self.access_key_secret = access_key_secret self.client = self.create_client()
def create_client(self) -> Ecs20140526Client: """ 使用AK&SK初始化账号Client @param access_key_id: @param access_key_secret: @return: Client @throws Exception """ config = open_api_models.Config( # 您的 AccessKey ID, access_key_id=self.access_key_id, # 您的 AccessKey Secret, access_key_secret=self.access_key_secret ) # 访问的域名 config.endpoint = self.endpoint return Ecs20140526Client(config)
def create_sec_policy(self, source_ip: str, description: str, security_group_id: str, ) -> None: permissions_0 = ecs_20140526_models.AuthorizeSecurityGroupRequestPermissions( policy='accept', priority='1', port_range='-1/-1', ip_protocol='ALL', source_cidr_ip=source_ip, description=description
) authorize_security_group_request = ecs_20140526_models.AuthorizeSecurityGroupRequest( region_id=self.region_id, security_group_id=security_group_id, permissions=[ permissions_0 ] ) runtime = util_models.RuntimeOptions() try: # 复制代码运行请自行打印 API 的返回值 resp = self.client.authorize_security_group_with_options(authorize_security_group_request, runtime) print(resp) except Exception as error: # 如有需要,请打印 error resp = UtilClient.assert_as_string(error.message) print(resp)
def get_sec_group_ip(self, security_group_id: str, description: str, ) -> List: describe_security_group_attribute_request = ecs_20140526_models.DescribeSecurityGroupAttributeRequest( security_group_id=security_group_id, region_id=self.region_id, direction='ingress' ) runtime = util_models.RuntimeOptions() try: # 复制代码运行请自行打印 API 的返回值 resp = self.client.describe_security_group_attribute_with_options(describe_security_group_attribute_request, runtime).to_map() policy_lists = resp['body']['Permissions']['Permission'] # ip_lists = [ x['SourceCidrIp'] for x in policy_lists ] ip_lists = [] nowDate = datetime.datetime.utcnow()
for i in policy_lists: create_time = datetime.datetime.strptime(i['CreateTime'], "%Y-%m-%dT%H:%M:%SZ")
# 删除48小时前策略 if i['Description'] == description and create_time < nowDate - datetime.timedelta(hours=48): ip_lists.append(i['SourceCidrIp']) # print(ip_lists) # print(policy_lists) return ip_lists except Exception as error: # 如有需要,请打印 error UtilClient.assert_as_string(error.message)
def del_sec_group_ip(self, ip_lists: list, security_group_id: str, description: str, ) -> None: if not ip_lists: print("not found ip!") return 0 policy_lists = [ ecs_20140526_models.RevokeSecurityGroupRequestPermissions( policy='accept', priority='1', ip_protocol='ALL', port_range='-1/-1', source_cidr_ip=x, description=description ) for x in ip_lists ]
revoke_security_group_request = ecs_20140526_models.RevokeSecurityGroupRequest( region_id=self.region_id, security_group_id=security_group_id, permissions=policy_lists ) runtime = util_models.RuntimeOptions() try: # 复制代码运行请自行打印 API 的返回值 resp = self.client.revoke_security_group_with_options(revoke_security_group_request, runtime) print(resp) except Exception as error: # 如有需要,请打印 error UtilClient.assert_as_string(error.message)
@staticmethod def get_client_public_ip():
# ip138.com中使用iframe,这里先获得iframe中的src # 每年iframe中的地址会变,比如 2019.ip138.com 2022.ip138.com headers = ("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36") opener = urllib.request.build_opener() opener.addheaders = [headers] data = opener.open("http://ip138.com") doc = pq(data.read())
# 获得 iframe 标签的 src 属性的值 # 获得出来大概是这样 "//2022.ip138.com/" # 再去掉两头多余的 "/" 就获得到实际的显示地址了 url = "http://" + doc('iframe').eq(0).attr('src').replace('/', '') # print(url) opener.close()
# 获取ip地址 opener = urllib.request.build_opener() opener.addheaders = [headers] data = opener.open(url) doc = pq(data.read().decode('utf8'))
# 取得素有的 <a> 元素 lista = doc('body p a')
# 取得第一个<a> 元素 firstaddr = lista.eq(0).text() # print(firsta) return firstaddr
if __name__ == '__main__': region_id = 'cn-guangzhou' endpoint = f'ecs.cn-guangzhou.aliyuncs.com' access_key_id = 'xxxx' access_key_secret = 'xxxxx'
security_group_id = 'sg-xxxxx' description_by_create = 'auto create by generator'
# 生成对象 auto_obj = AutoModSecPolicy(region_id,endpoint, access_key_id, access_key_secret)
# openapi有接口限流,限制下 time.sleep(1) ip_lists = auto_obj.get_sec_group_ip(security_group_id=security_group_id, description=description_by_create) print('Cleanning IP:',ip_lists)
time.sleep(1) auto_obj.del_sec_group_ip(ip_lists=ip_lists, security_group_id=security_group_id, description=description_by_create)
source_ip = auto_obj.get_client_public_ip() print("Adding IP",source_ip)
auto_obj.create_sec_policy(source_ip=source_ip, security_group_id=security_group_id, description=description_by_create)
|