In short, this talk gives you insight why were you get the following warning in vt-py:
>>> client = vt.Client(apikey="dummy")
>>> obj = client.get_object("/files/44d88612fea8a8f36de82e1278abb02f")
>>> exit()
Unclosed connector
And how to use it well.
This is the official Python client library for VirusTotal. With this library you can interact with the VirusTotal REST API v3 and automate your workflow quickly and efficiently.
pip install vt-py
vt-py can:
vt-py is an asyncio native client library (based on aio-libs/aiohttp) and also type-hinted by default. It provides powerful features to play along with VT REST API.
asyncio is a library to write concurrent code using the async/await syntax. asyncio is used as a foundation for multiple Python asynchronous frameworks that provide high-performance network and web-servers, database connection libraries, distributed task queues, etc. asyncio is often a perfect fit for IO-bound and high-level structured network code.
(Source: https://devopedia.org/asynchronous-programming-in-python)
Python has support for optional "type hints" (also called "type annotations"). These "type hints" or annotations are a special syntax that allow declaring the type of a variable. By declaring types for your variables, editors and tools can give you better support.
Inline script metadata allows you to define dependencies required for a script. pypa/hatch, astral-sh/uv, etc. support it. uv run
with the inline script metadata installs dependencies in an isolated venv per script under ~/.cache/uv
.
# /// script
# dependencies = [
# "vt-py==0.19.0",
# ]
# ///
import vt
print(vt.__version__)
print(vt.__file__)
$ uv run test.py
Installed 11 packages in 13ms
0.19.0
~/.cache/uv/environments-v2/test-8507df15a68ed868/lib/python3.12/site-packages/vt/__init__.py
import vt
with vt.Client(apikey="...") as client:
obj = client.get_object("/files/44d88612fea8a8f36de82e1278abb02f")
It looks non-async. But asyncio works under the hood.
# vt-py/vt/client.py
class Client:
...
def get_object(
self,
path: str,
*path_args: typing.Any,
params: typing.Optional[typing.Dict] = None,
) -> Object:
...
return make_sync(self.get_object_async(path, *path_args, params=params))
# vt-py/vt/utils.py
import asyncio
import typing
def make_sync(future: typing.Coroutine):
"""Utility function that waits for an async call, making it sync."""
try:
event_loop = asyncio.get_event_loop()
except RuntimeError:
# Generate an event loop if there isn't any.
event_loop = asyncio.new_event_loop()
asyncio.set_event_loop(event_loop)
return event_loop.run_until_complete(future)
asyncio version
async with vt.Client(apikey="...") as client:
obj = await client.get_object_async("/files/44d88612fea8a8f36de82e1278abb02f")
hashes: list[str] = ["..."]
async with vt.Client(apikey="...") as client:
coroutines = [client.get_object_async(f"/files/{h}") for h in hashes]
objects: list[vt.Object] = await asyncio.gather(*coroutines)
for obj in objects:
print(obj.id, obj.type)
with
Statement? The with statement is used to wrap the execution of a block with methods defined by a context manager.
The context manager’s exit() method is invoked. If an exception caused the suite to be exited, its type, value, and traceback are passed as arguments to exit(). Otherwise, three None arguments are supplied.
# vt-py/client.py
class Client:
...
async def __aenter__(self):
return self
async def __aexit__(self, item_type, value, traceback):
await self.close_async()
def __enter__(self):
return self
def __exit__(self, item_type, value, traceback):
self.close()
async def close_async(self) -> None:
"""Like :func:`close` but returns a coroutine."""
# Using getattr(self, '_session', None) instead of self._session because
# close_async can be called from __del__ when the object is not yet
# inialized and therefore the object doesn't have a _session. Calling
# self._session in that case would raise AttributeError. See:
# https://github.com/VirusTotal/vt-py/issues/125#issue-1449917146
session = getattr(self, "_session", None)
if session:
await session.close()
self._session = None
def close(self) -> None:
"""Closes the client.
When the client is not needed anymore it must be closed for releasing
resources like TCP connections.
Not closing the client after it's used might show error tracebacks
indicating it was not properly closed.
"""
return make_sync(self.close_async())
Also there is Iterator
which is suitable for handling continuation cursor (e.g. Intelligence search).
{
"meta": {
"cursor": "..."
},
"data": []
}
# Intelligence search
for obj in client.iterator(
"/intelligence/search", params={"query": "..."}, batch_size=10, limit=100
):
print(obj)
# IoC stream
for obj in client.iterator("/ioc-stream"):
print(obj)
# async ver.
async for obj in client.iterator(
"/intelligence/search", params={"query": "..."}, batch_size=10, limit=100
):
print(obj)
batch_size
: Maximum number of objects retrieved on each call to the endpoint. If not provided the server will decide how many objects to return.limit
: Maximum number of objects that will be returned by the iterator. If a limit is not provided the iterator continues until it reaches the last object in the collection.batch_size=10, limit=100
iterates over the most recent 100 objects, retrieving them in batches of 10.
for obj in client.feed(vt.FeedType.URLS):
print(obj)
# async ver.
async for obj in client.feed(vt.FeedType.URLS):
print(obj)
VT object data layout:
{
"data": {
"attributes": {},
"context_attributes": {},
"relationships": {},
"id": "...",
"links": {
"self": "https://www.virustotal.com/ui/..."
},
"type": "..."
}
}
id
: ID.type:
Object type.links
: Links to self, next, etc.attributes
: Object attributes.context_attributes
: Context attributes. Automatically added by VT through analysis?. Optional.relationships
: Links or dependencies between objects. Should be explicitly requested via relationships
query parameters. Optional.FileBehavior context_attributes (Feed only?)
The FileBehavior object will contain an extra attribute (context_attributes), which is a JSON structure that contains links for downloading the PCAP, HTML, EVTX and memdump files generated in the analysis through our API without consuming your quota (bear in mind that you will have to use your API Key and add it to the request headers in order to get access to the behaviour reports pointed by those two links).
--- https://docs.virustotal.com/reference/feeds-file-behaviour
"context_attributes": {
"file_md5": "...",
"file_sha1": "...",
"file_type_tag": "...",
"html_report": "https://www.virustotal.com/api/v3/feeds/file-behaviours/<TOKEN>/html",
"pcap": "https://www.virustotal.com/api/v3/feeds/file-behaviours/<TOKEN>/pcap",
"evtx": "https://www.virustotal.com/api/v3/feeds/file-behaviours/<TOKEN>/evtx",
"memdump": "https://www.virustotal.com/api/v3/feeds/file-behaviours/<TOKEN>/memdump"
}
IP address relationships
# https://www.virustotal.com/api/v3/{collection name}/{object id}?relationships={relationship 1},{relationship 2}
client.get_object(
"/ip_addresses/...",
params={"relationships": "communicating_files,downloaded_files"},
)
communicating_files
: lists all files presenting any sort of traffic to the given IP address at some point of its execution.downloaded_files
: returns a list of files that were available from an URL under the given IP address at some moment."relationships": {
"communicating_files": {
"data": [
{
"id": "...",
"type": "file"
}
],
"links": {}
},
"downloaded_files": {
"data": [
{
"id": "...",
"type": "file"
}
],
"links": {}
}
}
vt.Object
attributes:
id
: SHA256 (file
), domain (domain
), URL SHA256 (url
), etc.type
: file
, domain
, url
, etc.context_attributes
relationships
>>> obj = client.get_object("/files/275a021bbfb6489e54d471899f7db9d1663fc695ec2fe2a2c4538aabf651fd0f")
>>> obj.type
file
>>> obj.id
275a021bbfb6489e54d471899f7db9d1663fc695ec2fe2a2c4538aabf651fd0f
>>> obj.context_attributes
{}
>>> obj.relationships
{}
attributes
are stored as Object
instance's attributes and you can access it via the dot notation or get
method.
{
"data": {
"attributes": {
"reputation": 0,
"first_seen_itw_date": 1582585760,
"last_analysis_stats": {
"malicious": 0,
...
}
...
},
"id": "...",
"type": "file"
}
}
>>> obj.reputation
0
>>> obj.get("reputation")
0
Note that there is a different between #get
and the dot notation.
get
method returns an original value.datetime.datetime
if an attribute name matches with any of: ^.+_date$
^date$
^last_login$
^user_since$
>>> obj.first_seen_itw_date
datetime.datetime(2020, 2, 24, 23, 9, 20)
>>> obj.get("first_seen_itw_date")
1582585760
Also note that the dot notation only supports top level attributes.
>>> obj.last_analysis_stats
{'malicious': 0, ...}
>>> obj.last_analysis_stats.malicious
AttributeError: 'WhistleBlowerDict' object has no attribute 'malicious'
#to_dict
is for getting the whole data.
>>> obj.to_dict()
{
'type': 'file',
'id': '275a021bbfb6489e54d471899f7db9d1663fc695ec2fe2a2c4538aabf651fd0f',
'attributes': { ... },
}
NOTE
#to_dict
result is a dict but it has collections.UserDict
(= vt.object.WhistleBlowerDict
) inside. Thus converting it as a JSON is a bit tricky. See the below appendix for details.
IMPORTANT
Update: the above issue is fixed in the latest version (0.20.0).
#to_dict
to JSON IMPORTANT
Update: you need the following hack only when using vt-py <0.20.0. >=0.20.0 doesn't need the hack.
You have to use UserDictJsonEncoder
when serializing #to_dict
result as a JSON. Otherwise you will get TypeError: Type is not JSON serializable: WhistleBlowerDict
.
import json
import vt
from vt.object import UserDictJsonEncoder
obj = vt.Object(...)
json_str = json.dumps(obj.to_dict(), cls=UserDictJsonEncoder)
NOTE
I created a PR (vt-py#211) to address this issue and I wish it will be shipped in a next version soon.
IMPORTANT
Update: it's been merged and shipped as v0.20.0!
I recommend to use dictpath when interacting with complex/nested attributes.
Bad (Or not so good)
# can have AttributeError and KeyError
obj.pe_info["imphash"]
# redundant
obj.get("pe_info", {}).get("imphash")
Good (Or better)
Use dictpath, a tiny wrapper of h2non/jsonpath-ng.
{
"data": {
"id": "...",
"type": "file",
"attributes": {
"pe_info": {
"sections": [
{
"name": "UPX0",
"chi2": -1.0,
"virtual_address": 4096,
"entropy": 0.0,
"raw_size": 0,
"flags": "rwx",
"virtual_size": 1458176,
"md5": "d41d8cd98f00b204e9800998ecf8427e"
},
...
]
},
"sandbox_verdicts": {
"Zenbox": {
"category": "harmless",
"confidence": 1,
"sandbox_name": "Zenbox",
"malware_classification": [
"CLEAN"
]
},
...
}
...
}
}
}
# "pip install jsonpath-ng" is needed
>>> import dictpath
>>> obj = vt.Object(...)
>>> data = obj.to_dict()
>>> dictpath.get_all(data, "$.attributes.pe_info.sections[*].md5")
['d41d8cd98f00b204e9800998ecf8427e', ...]
>>> dictpath.get_all(data, "$.attributes.sandbox_verdicts.*.sandbox_name")
['Zenbox', ...]
curl/xh ver.
NOTE
ducaale/xh: Friendly and fast tool for sending HTTP requests.
# brew install xh jq
# or
# choco install xh jq
$ xh https://www.virustotal.com/api/v3/intelligence/search x-apikey:... query=="entity:url ..."
{
"data": [
{
"id": "...",
"type": "url",
"links": {},
"attributes": {},
"context_attributes": {
"url": "..."
}
}
],
"meta": {
"cursor": "...",
...
},
}
$ xh https://www.virustotal.com/api/v3/intelligence/search x-apikey:... query=="entity:url ..." | jq -r ".data[].context_attributes.url"
Simple enough. But how do you handle continuation cursor (meta.cursor
)?
vt-cli ver.
# brew install virustotal-cli
# or
# choco install vt-cli
$ vt search "entity:url ..." --format json | jq -r ".[]._context_attributes.url"
MORE WITH:
vt search 'entity:url p:1+' --format=json --cursor=...
...
vt-py ver.
import vt
with vt.Client(apikey="...") as client:
for obj in client.iterator(
"/intelligence/search", params={"query": "entity:url ...."}
):
url: str | None = obj.context_attributes.get("url")
if url is None:
continue
do_something(url)
# /// script
# dependencies = [
# "arakawa",
# "starlette",
# "vt-py>=0.20.0",
# ]
# ///
import collections
import arakawa as ar
import pandas as pd
import vt
from starlette.config import Config
from starlette.datastructures import Secret
config = Config(".env")
VT_API_KEY: Secret = config("VT_API_KEY", cast=Secret, default="") # type: ignore
with vt.Client(str(VT_API_KEY)) as client:
objects = list(
client.iterator(
"/intelligence/search", params={"query": "entity:url p:10+"}, limit=10
)
)
def extract_attributes_with_url(obj: vt.Object) -> dict:
# extract attributes and add url in context_attributes
attributes = obj.to_dict()["attributes"]
attributes["url"] = obj.context_attributes.get("url")
return attributes
data = [extract_attributes_with_url(obj) for obj in objects]
df = pd.json_normalize(data)
ar.Report(
ar.DataTable(df),
df["title"].value_counts().to_frame(),
df["favicon.raw_md5"].value_counts().to_frame(),
).save("sample.html", open=True)
# /// script
# dependencies = [
# "loguru",
# "starlette",
# "typer",
# "vt-py",
# ]
# ///
# Forked from https://github.com/VirusTotal/vt-py/blob/master/examples/intelligence_search_to_network_infrastructure.py
#
# Copyright © 2019 The vt-py authors under the Apache License version 2.0.
"""VT Intelligence searches to network IoCs.
This is a script to showcase how programmatic VT Intelligence searches can be
combined with file sandbox behavior lookups in order to generate network
indicators of compromise that can be fed into network perimeter defenses.
Read more:
https://www.virustotal.com/gui/intelligence-overview
https://docs.virustotal.com/reference/search
https://docs.virustotal.com/docs/virustotal-intelligence-introduction
"""
import asyncio
import re
from collections import defaultdict
from typing import Annotated, Any, DefaultDict, Literal, TypedDict, cast
import typer
import vt
from loguru import logger
from starlette.config import Config
from starlette.datastructures import Secret
config = Config(".env")
VT_API_KEY: Secret | None = config("VT_API_KEY", cast=Secret, default=None)
NetworkSingularType = Literal["domain", "ip", "url"]
class ContactedAddress(TypedDict):
type: NetworkSingularType
id: str
context_attributes: dict[str, str] | None
NetworkPluralType = Literal["domains", "ips", "urls"]
class NetworkContainer(TypedDict):
contacted_addresses: list[ContactedAddress]
type: NetworkPluralType
file: str
class VTISearchToNetworkInfrastructureHandler:
"""Class for handling the process of analyzing VTI search matches."""
_SEARCH_ENTITY_REGEX = re.compile(r"entity: (\w+)")
def __init__(self, apikey: Secret):
self.apikey = apikey
self.queue: asyncio.Queue[NetworkContainer] = asyncio.Queue()
self.files_queue: asyncio.Queue[str] = asyncio.Queue()
self.networking_counters: dict[NetworkPluralType, defaultdict[str, int]] = {
"domains": defaultdict(lambda: 0),
"ips": defaultdict(lambda: 0),
"urls": defaultdict(lambda: 0),
}
self.networking_infrastructure: DefaultDict[
str, DefaultDict[NetworkPluralType, Any]
] = defaultdict(lambda: defaultdict(lambda: {}))
async def get_file_async(self, checksum: str, *, relationships: str | None = None):
"""Look up a file object."""
async with vt.Client(str(self.apikey)) as client:
return await client.get_object_async(
f"/files/{checksum}", params={"relationships": relationships}
)
async def get_matching_files(self, query: str, max_files: int):
"""Query intelligence for files matching the given criteria."""
entity_match = self._SEARCH_ENTITY_REGEX.match(query.lower())
if entity_match and entity_match.group(1) != "file":
raise ValueError("Only file search queries are valid in this example.")
async with vt.Client(str(self.apikey)) as client:
query = query.lower()
url = "/intelligence/search"
logger.info("Performing VT Intelligence search...")
files = client.iterator(url, params={"query": query}, limit=max_files)
async for matching_file in files:
sha256 = cast(str, matching_file.sha256)
await self.files_queue.put(sha256)
logger.info(
"Search concluded, waiting on network infrastructure retrieval..."
)
async def get_network(self):
"""Retrieve the network infrastructure related to matching files."""
while True:
checksum = await self.files_queue.get()
file_obj = await self.get_file_async(
checksum, relationships="contacted_domains,contacted_ips,contacted_urls"
)
relationships = file_obj.relationships
contacted_domains = relationships["contacted_domains"]["data"]
contacted_urls = relationships["contacted_urls"]["data"]
contacted_ips = relationships["contacted_ips"]["data"]
await self.queue.put(
{
"contacted_addresses": contacted_domains,
"type": "domains",
"file": checksum,
}
)
await self.queue.put(
{
"contacted_addresses": contacted_ips,
"type": "ips",
"file": checksum,
}
)
await self.queue.put(
{
"contacted_addresses": contacted_urls,
"type": "urls",
"file": checksum,
}
)
self.networking_infrastructure[checksum]["domains"] = contacted_domains
self.networking_infrastructure[checksum]["ips"] = contacted_ips
self.networking_infrastructure[checksum]["urls"] = contacted_urls
self.files_queue.task_done()
async def build_network(self):
"""Build the stats of the network infrastructure."""
while True:
item = await self.queue.get()
item_type = item["type"]
for contacted_address in item["contacted_addresses"]:
if item_type in ("domains", "ips"):
address = contacted_address["id"]
else:
# url type should have url in context_attributes
address = contacted_address["context_attributes"]["url"] # type: ignore
self.networking_counters[item_type][address] += 1
self.queue.task_done()
def print_results(self):
print("=== Results: ===")
for item in self.networking_infrastructure.items():
contacted_addr = item[1].values()
if any(contacted_addr):
for inf in item[1].items():
for key in inf[1]:
key = cast(ContactedAddress, key)
k = key["type"].upper()
context_attributes = key.get("context_attributes") or {}
v = context_attributes.get("url") or key["id"]
print(f"{k}: {v}")
def main(
query: str,
*,
apikey: Annotated[
str | None,
typer.Option(help="Your VirusTotal API key. Defaults to VT_API_KEY ENV."),
] = None,
limit: Annotated[int, typer.Option(help="Limit of files to process.")] = 10,
):
key = apikey or VT_API_KEY
assert key, "API key is required."
async def inner():
handler = VTISearchToNetworkInfrastructureHandler(Secret(str(key)))
enqueue_files_task = asyncio.create_task(
handler.get_matching_files(query, limit)
)
_ = asyncio.create_task(handler.get_network()) # noqa: RUF006
_ = asyncio.create_task(handler.build_network()) # noqa: RUF006
await asyncio.gather(enqueue_files_task)
await handler.files_queue.join()
await handler.queue.join()
handler.print_results()
asyncio.run(inner())
if __name__ == "__main__":
typer.run(main)
$ uv run intelligence_search_to_network_infrastructure.py "entity:file AND sha256:7c347a029d0d8600dc58885d6f359517e093f9f13ccb9a697c2655ed5f87f4d"
=== Results: ===
DOMAIN: acroipm2.adobe.com
DOMAIN: cloud.ccm19.de
DOMAIN: content-autofill.googleapis.com
DOMAIN: edgedl.me.gvt1.com
DOMAIN: google.com
DOMAIN: i.lencr.org
DOMAIN: ka-f.fontawesome.com
DOMAIN: kit.fontawesome.com
DOMAIN: media-exp1.licdn.com
DOMAIN: media.licdn.com
DOMAIN: media.tagembed.com
DOMAIN: platform.linkedin.com
DOMAIN: s3.us-west-1.wasabisys.com
DOMAIN: widget.tagembed.com
DOMAIN: www.eicar.org
DOMAIN: www.youtube.com
DOMAIN: x1.i.lencr.org
IP_ADDRESS: 104.18.40.68
IP_ADDRESS: 104.21.26.223
IP_ADDRESS: 104.244.42.8
IP_ADDRESS: 104.26.6.103
IP_ADDRESS: 104.26.7.103
IP_ADDRESS: 104.26.9.185
IP_ADDRESS: 13.107.42.14
IP_ADDRESS: 142.250.125.94
IP_ADDRESS: 142.251.143.106
IP_ADDRESS: 142.251.143.110
IP_ADDRESS: 142.251.143.138
IP_ADDRESS: 142.251.143.142
IP_ADDRESS: 142.251.143.163
IP_ADDRESS: 142.251.143.170
IP_ADDRESS: 142.251.143.174
IP_ADDRESS: 142.251.143.202
IP_ADDRESS: 142.251.143.206
IP_ADDRESS: 148.251.5.29
IP_ADDRESS: 152.199.21.118
IP_ADDRESS: 172.64.147.188
URL: http://x1.i.lencr.org/