Try some refactoring. Needs testing.

This commit is contained in:
Manuel Friedli 2021-07-15 21:51:14 +02:00
parent f7f833e289
commit 0062ae0b2b

View file

@ -26,7 +26,6 @@ class LookupException(Exception):
""" """
pass pass
def get_county_code(ipaddress, dbfile): def get_county_code(ipaddress, dbfile):
""" """
Determine the country code that the given ipaddress comes from. Determine the country code that the given ipaddress comes from.
@ -38,7 +37,7 @@ def get_county_code(ipaddress, dbfile):
raise LookupException("No address given") raise LookupException("No address given")
if not dbfile: if not dbfile:
raise LookupException("No db file given") raise LookupException("No db file given")
reader = None reader = None
try: try:
reader = geoip2.database.Reader(dbfile) reader = geoip2.database.Reader(dbfile)
@ -50,10 +49,9 @@ def get_county_code(ipaddress, dbfile):
country = reader.country(ipaddress).country country = reader.country(ipaddress).country
# ASN is not supported # ASN is not supported
# elif dbfile == 'GeoLite2-ASN' or dbtype == 'GeoIP2-ASN': # elif dbfile == 'GeoLite2-ASN' or dbtype == 'GeoIP2-ASN':
else:
if not country:
raise LookupException("Unsupported DB type: " + dbtype) raise LookupException("Unsupported DB type: " + dbtype)
return country.iso_code return country.iso_code
except FileNotFoundError as e: except FileNotFoundError as e:
raise LookupException(e.args) raise LookupException(e.args)
@ -68,14 +66,38 @@ def get_county_code(ipaddress, dbfile):
reader.close() reader.close()
def get_details(ipaddress, dbfile): def get_details(ipaddress, dbfile):
"""
Determine the country code, continent code and the network that the given ipaddress comes from.
:param ipaddress: The IP address to look up
:param dbfile: The path to the GeoIP2/GeoLite2 database file (Country or City database)
:return: A string consisting of the ISO country code (2 letters), ISO continent code (2 letters) and network (CIDR notation), concatenated by ","
"""
if not ipaddress:
raise LookupException("No address given")
if not dbfile:
raise LookupException("No db file given")
reader = None reader = None
try: try:
reader = geoip2.database.Reader(dbfile) reader = geoip2.database.Reader(dbfile)
res = reader.city(ipaddress) dbtype = reader.metadata().database_type
country = res.country.iso_code result = None
continent = res.continent.code country = None
network = res.traits.network continent = None
return "%s|%s|%s" % (country, continent, network) network = None
if dbtype == 'GeoLite2-City' or dbtype == 'GeoIP2-City':
result = reader.city(ipaddress)
elif dbfile == 'GeoLite2-Country' or dbtype == 'GeoIP2-Country':
result = reader.country(ipaddress)
# ASN is not supported
# elif dbfile == 'GeoLite2-ASN' or dbtype == 'GeoIP2-ASN':
else:
raise LookupException("Unsupported DB type: " + dbtype)
country = result.country.iso_code
continent = result.continent.code
network = result.traits.network
return "%s,%s,%s" % (country, continent, network)
finally: finally:
if reader: if reader:
reader.close() reader.close()
@ -90,17 +112,18 @@ def parse_command_line(argv):
dbfile = None dbfile = None
parser = argparse.ArgumentParser(description='Get the country code from an IP address') parser = argparse.ArgumentParser(description='Get the country code from an IP address')
parser.add_argument('-f', dest='dbfile', required=True, help="Path to the GeoIP2 database file") parser.add_argument('-f', dest='dbfile', required=True, help="Path to the GeoIP2 database file")
parser.add_argument('--details', dest='detail', action='store_const', const=True, default=False, help="Verbose output: Print continent code and network along with country code") parser.add_argument('-d', dest='detail', action='store_const', const=True, default=False, help="Verbose output: Print continent code and network along with country code")
parser.add_argument('--detail', dest='detail', action='store_const', const=True, default=False, help="Verbose output: Print continent code and network along with country code")
parser.add_argument('address', help="The IP address to check") parser.add_argument('address', help="The IP address to check")
args = parser.parse_args() args = parser.parse_args()
return args.dbfile, args.address, args.detail return args.dbfile, args.address, args.detail
def main(argv): def main(argv):
""" """
Read the database file and the IP address from the command line and print the corresponding ISO country code on Read the database file and the IP address from the command line and print the corresponding ISO country code on
stdout. stdout.
:param argv: Format: "-f /path/to/database.mmdb ip.v4.add.ress" :param argv: Format: "-f /path/to/database.mmdb [--detail|-d] ip.v4.add.ress"
:return: :return:
""" """
try: try:
@ -109,13 +132,12 @@ def main(argv):
code = get_details(ipaddress, dbfile) code = get_details(ipaddress, dbfile)
else: else:
code = get_county_code(ipaddress, dbfile) code = get_county_code(ipaddress, dbfile)
print(code) print(code)
except LookupException as e: except LookupException as e:
print(e.args, file=sys.stderr) print(e.args, file=sys.stderr)
print("Unknown") print("Unknown")
if __name__ == '__main__': if __name__ == '__main__':
try: try:
main(sys.argv[1:]) main(sys.argv[1:])