1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202
| # -*- coding: utf-8 -*-
import time import datetime
import urllib import urllib.parse import urllib.request from pyquery import PyQuery as pq from typing import List
from alibabacloud_ecs20140526.client import Client as Ecs20140526Client from alibabacloud_tea_openapi import models as open_api_models from alibabacloud_ecs20140526 import models as ecs_20140526_models from alibabacloud_tea_util import models as util_models from alibabacloud_tea_util.client import Client as UtilClient
class AutoModSecPolicy: def __init__(self, region_id, endpoint, access_key_id, access_key_secret): self.region_id = region_id self.endpoint = endpoint self.access_key_id = access_key_id self.access_key_secret = access_key_secret self.client = self.create_client()
def create_client(self) -> Ecs20140526Client: """ 使用AK&SK初始化账号Client @param access_key_id: @param access_key_secret: @return: Client @throws Exception """ config = open_api_models.Config( # 您的 AccessKey ID, access_key_id=self.access_key_id, # 您的 AccessKey Secret, access_key_secret=self.access_key_secret ) # 访问的域名 config.endpoint = self.endpoint return Ecs20140526Client(config)
def create_sec_policy(self, source_ip: str, description: str, security_group_id: str, ) -> None: permissions_0 = ecs_20140526_models.AuthorizeSecurityGroupRequestPermissions( policy='accept', priority='1', port_range='-1/-1', ip_protocol='ALL', source_cidr_ip=source_ip, description=description
) authorize_security_group_request = ecs_20140526_models.AuthorizeSecurityGroupRequest( region_id=self.region_id, security_group_id=security_group_id, permissions=[ permissions_0 ] ) runtime = util_models.RuntimeOptions() try: # 复制代码运行请自行打印 API 的返回值 resp = self.client.authorize_security_group_with_options(authorize_security_group_request, runtime) print(resp) except Exception as error: # 如有需要,请打印 error resp = UtilClient.assert_as_string(error.message) print(resp)
def get_sec_group_ip(self, security_group_id: str, description: str, ) -> List: describe_security_group_attribute_request = ecs_20140526_models.DescribeSecurityGroupAttributeRequest( security_group_id=security_group_id, region_id=self.region_id, direction='ingress' ) runtime = util_models.RuntimeOptions() try: # 复制代码运行请自行打印 API 的返回值 resp = self.client.describe_security_group_attribute_with_options(describe_security_group_attribute_request, runtime).to_map() policy_lists = resp['body']['Permissions']['Permission'] # ip_lists = [ x['SourceCidrIp'] for x in policy_lists ] ip_lists = [] nowDate = datetime.datetime.utcnow()
for i in policy_lists: create_time = datetime.datetime.strptime(i['CreateTime'], "%Y-%m-%dT%H:%M:%SZ")
# 删除48小时前策略 if i['Description'] == description and create_time < nowDate - datetime.timedelta(hours=48): ip_lists.append(i['SourceCidrIp']) # print(ip_lists) # print(policy_lists) return ip_lists except Exception as error: # 如有需要,请打印 error UtilClient.assert_as_string(error.message)
def del_sec_group_ip(self, ip_lists: list, security_group_id: str, description: str, ) -> None: if not ip_lists: print("not found ip!") return 0 policy_lists = [ ecs_20140526_models.RevokeSecurityGroupRequestPermissions( policy='accept', priority='1', ip_protocol='ALL', port_range='-1/-1', source_cidr_ip=x, description=description ) for x in ip_lists ]
revoke_security_group_request = ecs_20140526_models.RevokeSecurityGroupRequest( region_id=self.region_id, security_group_id=security_group_id, permissions=policy_lists ) runtime = util_models.RuntimeOptions() try: # 复制代码运行请自行打印 API 的返回值 resp = self.client.revoke_security_group_with_options(revoke_security_group_request, runtime) print(resp) except Exception as error: # 如有需要,请打印 error UtilClient.assert_as_string(error.message)
@staticmethod def get_client_public_ip():
# ip138.com中使用iframe,这里先获得iframe中的src # 每年iframe中的地址会变,比如 2019.ip138.com 2022.ip138.com headers = ("User-Agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/103.0.0.0 Safari/537.36") opener = urllib.request.build_opener() opener.addheaders = [headers] data = opener.open("http://ip138.com") doc = pq(data.read())
# 获得 iframe 标签的 src 属性的值 # 获得出来大概是这样 "//2022.ip138.com/" # 再去掉两头多余的 "/" 就获得到实际的显示地址了 url = "http://" + doc('iframe').eq(0).attr('src').replace('/', '') # print(url) opener.close()
# 获取ip地址 opener = urllib.request.build_opener() opener.addheaders = [headers] data = opener.open(url) doc = pq(data.read().decode('utf8'))
# 取得素有的 <a> 元素 lista = doc('body p a')
# 取得第一个<a> 元素 firstaddr = lista.eq(0).text() # print(firsta) return firstaddr
if __name__ == '__main__': region_id = 'cn-guangzhou' endpoint = f'ecs.cn-guangzhou.aliyuncs.com' access_key_id = 'xxxx' access_key_secret = 'xxxxx'
security_group_id = 'sg-xxxxx' description_by_create = 'auto create by generator'
# 生成对象 auto_obj = AutoModSecPolicy(region_id,endpoint, access_key_id, access_key_secret)
# openapi有接口限流,限制下 time.sleep(1) ip_lists = auto_obj.get_sec_group_ip(security_group_id=security_group_id, description=description_by_create) print('Cleanning IP:',ip_lists)
time.sleep(1) auto_obj.del_sec_group_ip(ip_lists=ip_lists, security_group_id=security_group_id, description=description_by_create)
source_ip = auto_obj.get_client_public_ip() print("Adding IP",source_ip)
auto_obj.create_sec_policy(source_ip=source_ip, security_group_id=security_group_id, description=description_by_create)
|