mirror of
https://github.com/Detanup01/gbe_fork.git
synced 2025-03-28 14:56:24 +01:00
delete gen emu config
This commit is contained in:
parent
c2d4807d94
commit
1ddaad17bb
29 changed files with 1 additions and 4265 deletions
52
.github/workflows/gen_emu_config-build-linux.yml
vendored
52
.github/workflows/gen_emu_config-build-linux.yml
vendored
|
@ -1,52 +0,0 @@
|
|||
name: "Build gen_emu_config script (Linux)"
|
||||
|
||||
on:
|
||||
workflow_call:
|
||||
# needed since it allows this to become a reusable workflow
|
||||
workflow_dispatch:
|
||||
# allows manual trigger
|
||||
|
||||
permissions:
|
||||
contents: "write"
|
||||
|
||||
env:
|
||||
ARTIFACT_NAME: "generate_emu_config-linux-${{ github.sha }}"
|
||||
SCRIPT_BASE_DIR: "tools/generate_emu_config"
|
||||
PACKAGE_BASE_DIR: "tools/generate_emu_config/bin/linux"
|
||||
|
||||
jobs:
|
||||
build:
|
||||
runs-on: "ubuntu-20.04"
|
||||
|
||||
steps:
|
||||
- name: "Checkout branch"
|
||||
uses: actions/checkout@v4
|
||||
|
||||
# env
|
||||
- name: "Install env"
|
||||
shell: "bash"
|
||||
working-directory: "${{ env.SCRIPT_BASE_DIR }}"
|
||||
run: sudo chmod 777 recreate_venv_linux.sh && sudo ./recreate_venv_linux.sh
|
||||
|
||||
# fix folder permissions! not sure why this fails
|
||||
# nested subdirs "build/linux/release" cause permission problems
|
||||
- name: "Give all permissions to repo folder"
|
||||
shell: "bash"
|
||||
working-directory: "${{ github.workspace }}"
|
||||
run: sudo chmod -R 777 "${{ github.workspace }}"
|
||||
|
||||
# build
|
||||
- name: "Rebuild"
|
||||
shell: "bash"
|
||||
working-directory: "${{ env.SCRIPT_BASE_DIR }}"
|
||||
run: sudo chmod 777 rebuild_linux.sh && ./rebuild_linux.sh
|
||||
|
||||
# upload artifact
|
||||
- name: "Upload build package"
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: "${{ env.ARTIFACT_NAME }}"
|
||||
path: "${{ env.PACKAGE_BASE_DIR }}/"
|
||||
if-no-files-found: "error"
|
||||
compression-level: 9
|
||||
retention-days: 1
|
68
.github/workflows/gen_emu_config-build-win.yml
vendored
68
.github/workflows/gen_emu_config-build-win.yml
vendored
|
@ -1,68 +0,0 @@
|
|||
name: "Build gen_emu_config script (Windows)"
|
||||
|
||||
on:
|
||||
workflow_call:
|
||||
# needed since it allows this to become a reusable workflow
|
||||
workflow_dispatch:
|
||||
# allows manual trigger
|
||||
|
||||
permissions:
|
||||
contents: "write"
|
||||
|
||||
env:
|
||||
ARTIFACT_NAME: "generate_emu_config-win-${{ github.sha }}"
|
||||
SCRIPT_BASE_DIR: "tools/generate_emu_config"
|
||||
PACKAGE_BASE_DIR: "tools/generate_emu_config/bin/win"
|
||||
|
||||
THIRD_PARTY_BASE_DIR: "third-party"
|
||||
|
||||
jobs:
|
||||
build:
|
||||
runs-on: "windows-2022"
|
||||
|
||||
steps:
|
||||
- name: "Set up Python"
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: "3.12"
|
||||
|
||||
### on Windows Git will auto change line ending to CRLF, not preferable
|
||||
- name: "Ensure LF line ending"
|
||||
shell: "cmd"
|
||||
working-directory: "${{ github.workspace }}"
|
||||
run: |
|
||||
git config --local core.autocrlf false
|
||||
git config --system core.autocrlf false
|
||||
git config --global core.autocrlf false
|
||||
|
||||
- name: "Checkout branch"
|
||||
uses: actions/checkout@v4
|
||||
|
||||
## extra helpers/tools, these are not built inside the deps build dir
|
||||
- name: "Clone third-party deps (build/win)"
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
ref: "third-party/build/win"
|
||||
path: "${{env.THIRD_PARTY_BASE_DIR}}/build/win"
|
||||
|
||||
# env
|
||||
- name: "Install env"
|
||||
shell: "cmd"
|
||||
working-directory: "${{ env.SCRIPT_BASE_DIR }}"
|
||||
run: recreate_venv_win.bat
|
||||
|
||||
# build
|
||||
- name: "Rebuild"
|
||||
shell: "cmd"
|
||||
working-directory: "${{ env.SCRIPT_BASE_DIR }}"
|
||||
run: rebuild_win.bat
|
||||
|
||||
# upload artifact
|
||||
- name: "Upload build package"
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: "${{ env.ARTIFACT_NAME }}"
|
||||
path: "${{ env.PACKAGE_BASE_DIR }}/"
|
||||
if-no-files-found: "error"
|
||||
compression-level: 9
|
||||
retention-days: 1
|
|
@ -1,22 +0,0 @@
|
|||
name: "Gen emu cfg PR"
|
||||
|
||||
on:
|
||||
pull_request:
|
||||
branches: ["dev"]
|
||||
paths:
|
||||
- "!**/*.md"
|
||||
- "tools/generate_emu_config/**"
|
||||
|
||||
permissions:
|
||||
contents: "write"
|
||||
|
||||
jobs:
|
||||
script-win:
|
||||
name: "Gen emu config win"
|
||||
if: ${{ !cancelled() }}
|
||||
uses: "./.github/workflows/gen_emu_config-build-win.yml"
|
||||
|
||||
script-linux:
|
||||
name: "Gen emu config linux"
|
||||
if: ${{ !cancelled() }}
|
||||
uses: "./.github/workflows/gen_emu_config-build-linux.yml"
|
112
.github/workflows/release.yml
vendored
112
.github/workflows/release.yml
vendored
|
@ -186,118 +186,6 @@ jobs:
|
|||
compression-level: 0
|
||||
retention-days: 7
|
||||
|
||||
gen_emu_script-win:
|
||||
name: "Gen emu config win"
|
||||
if: ${{ !cancelled() }}
|
||||
uses: "./.github/workflows/gen_emu_config-build-win.yml"
|
||||
|
||||
gen_emu_script-win-prep:
|
||||
needs: ["gen_emu_script-win"]
|
||||
runs-on: "windows-2022"
|
||||
steps:
|
||||
# on Windows Git will auto change line ending to CRLF, not preferable
|
||||
- name: "Ensure LF line ending"
|
||||
shell: "cmd"
|
||||
working-directory: "${{ github.workspace }}"
|
||||
run: |
|
||||
git config --local core.autocrlf false
|
||||
git config --system core.autocrlf false
|
||||
git config --global core.autocrlf false
|
||||
|
||||
# we need branch because it has package scripts
|
||||
- name: "Checkout branch"
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: "Clone third-party deps (deps/win)"
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
ref: "third-party/deps/win"
|
||||
path: "${{env.THIRD_PARTY_BASE_DIR}}/deps/win"
|
||||
|
||||
# download artifacts
|
||||
- name: "Download script build artifacts (Win)"
|
||||
uses: actions/download-artifact@v4
|
||||
with:
|
||||
path: "tools/generate_emu_config/bin/win"
|
||||
pattern: "generate_emu_config-win-*"
|
||||
merge-multiple: true
|
||||
|
||||
# package
|
||||
- name: "Package script"
|
||||
shell: "cmd"
|
||||
working-directory: "tools/generate_emu_config"
|
||||
run: package_win.bat
|
||||
|
||||
# release tag
|
||||
- name: "Release"
|
||||
if: startsWith(github.ref, 'refs/tags/')
|
||||
uses: softprops/action-gh-release@v2
|
||||
with:
|
||||
files: "tools/generate_emu_config/bin/package/win/**/*"
|
||||
|
||||
# upload artifact/package if this is a manual run
|
||||
- name: "Upload release package"
|
||||
if: ${{ !startsWith(github.ref, 'refs/tags/') }}
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: "release-generate_emu_config-win-${{ github.sha }}"
|
||||
path: "tools/generate_emu_config/bin/package/win/**/*"
|
||||
if-no-files-found: "error"
|
||||
compression-level: 9
|
||||
retention-days: 7
|
||||
|
||||
gen_emu_script-linux:
|
||||
name: "Gen emu config linux"
|
||||
if: ${{ !cancelled() }}
|
||||
uses: "./.github/workflows/gen_emu_config-build-linux.yml"
|
||||
|
||||
gen_emu_script-linux-prep:
|
||||
needs: ["gen_emu_script-linux"]
|
||||
runs-on: "ubuntu-20.04"
|
||||
steps:
|
||||
# we need branch because it has package scripts
|
||||
- name: "Checkout branch"
|
||||
uses: actions/checkout@v4
|
||||
|
||||
# download artifacts
|
||||
- name: "Download script build artifacts (linux)"
|
||||
uses: actions/download-artifact@v4
|
||||
with:
|
||||
path: "tools/generate_emu_config/bin/linux"
|
||||
pattern: "generate_emu_config-linux-*"
|
||||
merge-multiple: true
|
||||
|
||||
# fix folder permissions! not sure why this fails
|
||||
# nested subdirs "build/linux/release" cause permission problems
|
||||
- name: "Give all permissions to repo folder"
|
||||
shell: "bash"
|
||||
working-directory: "${{ github.workspace }}"
|
||||
run: sudo chmod -R 777 "${{ github.workspace }}"
|
||||
|
||||
# package
|
||||
- name: "Package script"
|
||||
shell: "bash"
|
||||
working-directory: "tools/generate_emu_config"
|
||||
run: sudo chmod 777 package_linux.sh && sudo ./package_linux.sh
|
||||
|
||||
# release tag
|
||||
- name: "Release"
|
||||
if: startsWith(github.ref, 'refs/tags/')
|
||||
uses: softprops/action-gh-release@v2
|
||||
with:
|
||||
files: "tools/generate_emu_config/bin/package/linux/**/*"
|
||||
|
||||
# upload artifact/package if this is a manual run
|
||||
- name: "Upload release package"
|
||||
if: ${{ !startsWith(github.ref, 'refs/tags/') }}
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: "release-generate_emu_config-linux-${{ github.sha }}"
|
||||
path: "tools/generate_emu_config/bin/package/linux/**/*"
|
||||
if-no-files-found: "error"
|
||||
compression-level: 9
|
||||
retention-days: 7
|
||||
|
||||
migrate_gse_script-win:
|
||||
name: "Migrate GSE win"
|
||||
if: ${{ !cancelled() }}
|
||||
|
|
38
README.md
38
README.md
|
@ -30,13 +30,13 @@ This project depends on many third-party libraries and tools, credits to them fo
|
|||
|
||||
# How to use the emu
|
||||
* **Always generate the interfaces file using the `generate_interfaces` tool.**
|
||||
* **Generate the proper app configuration using the `generate_emu_config` tool.**
|
||||
* **If things don't work, try the `ColdClientLoader` setup.**
|
||||
|
||||
You can find helper guides, scripts, and tools here:
|
||||
|
||||
**(These guides, scripts, and tools are maintained by their authors.)**
|
||||
|
||||
* **[GBE Fork Tools](https://github.com/Detanup01/gbe_fork_tools)**
|
||||
* **[GBE-Autoconfigurator](https://github.com/daci12345/GBE-Autoconfigurator)**
|
||||
* **[Semuexec](https://gitlab.com/detiam/Semuexec)**
|
||||
* **[Steam Emu Utility](https://github.com/turusudiro/SteamEmuUtility)**
|
||||
|
@ -276,42 +276,6 @@ An example script `build_linux_premake.sh` is available, check it out
|
|||
|
||||
---
|
||||
|
||||
## **Building the tool `generate_emu_config`**
|
||||
Navigate to the folder `tools/generate_emu_config/` then
|
||||
|
||||
### On Windows:
|
||||
Open CMD then:
|
||||
1. Create python virtual environemnt and install the required packages/dependencies
|
||||
```batch
|
||||
recreate_venv_win.bat
|
||||
```
|
||||
2. Build the tool using `pyinstaller`
|
||||
```batch
|
||||
rebuild_win.bat
|
||||
```
|
||||
|
||||
This will build the tool inside `bin\win`
|
||||
|
||||
### On Linux:
|
||||
Open bash terminal then:
|
||||
1. Create python virtual environemnt and install the required packages/dependencies
|
||||
```shell
|
||||
sudo ./recreate_venv_linux.sh
|
||||
```
|
||||
You might need to edit this script to use a different python version.
|
||||
Find this line and change it:
|
||||
```shell
|
||||
python_package="python3.12"
|
||||
```
|
||||
2. Build the tool using `pyinstaller`
|
||||
```shell
|
||||
./rebuild_linux.sh
|
||||
```
|
||||
|
||||
This will build the tool inside `bin/linux`
|
||||
|
||||
---
|
||||
|
||||
## **Using Github CI as a builder**
|
||||
|
||||
This is really slow and mainly intended for the CI Workflow scripts, but you can use it as another outlet if you can't build locally.
|
||||
|
|
|
@ -1,64 +0,0 @@
|
|||
## What is this ?
|
||||
This is a command line tool to generate the `steam_settings` folder for the emu,
|
||||
you need a Steam account to grab most info, but you can use an anonymous account with limited access to Steam data.
|
||||
|
||||
<br/>
|
||||
|
||||
## Usage
|
||||
```bash
|
||||
generate_emu_config [options] <app id 1> [app id 2] [app id 3] ...
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Available **\[options\]**
|
||||
To get all available options, run the tool without any arguments.
|
||||
|
||||
---
|
||||
|
||||
### Login:
|
||||
You'll be asked each time to enter your username and password, but you can automate this prompt.
|
||||
|
||||
* You can create a file called `my_login.txt` beside this tool with the following data:
|
||||
- Your **username** on the **first** line
|
||||
- Your **password** on the **second** line
|
||||
|
||||
**But beware though of accidentally distributing your login data when using this file**.
|
||||
---
|
||||
* You can define these environment variables, note that these environment variables will override the file `my_login.txt`:
|
||||
- `GSE_CFG_USERNAME`
|
||||
- `GSE_CFG_PASSWORD`
|
||||
|
||||
When defining these environment variables in a script, take care of special characters.
|
||||
|
||||
Example for Windows:
|
||||
```shell
|
||||
set GSE_CFG_USERNAME=my_username
|
||||
set GSE_CFG_PASSWORD=123 abc
|
||||
generate_emu_config.exe 480
|
||||
```
|
||||
|
||||
Example for Linux:
|
||||
```shell
|
||||
export GSE_CFG_USERNAME=my_username
|
||||
export GSE_CFG_PASSWORD=123 abc
|
||||
./generate_emu_config 480
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
### Downloading data for new apps/games and defining extra account IDs:
|
||||
The script uses public Steam IDs (in Steam64 format) of apps/games owners in order to query the required info, such as achievement data.
|
||||
By default, it has a built-in list of public users IDs, and you can extend this list by creating a file called `top_owners_ids.txt` beside the script, then add each new ID in Steam64 format on a separate line.
|
||||
|
||||
When you login with a non-anonymous account, its ID will be added to the top of the list.
|
||||
|
||||
<br/>
|
||||
|
||||
---
|
||||
|
||||
## Attributions and credits
|
||||
|
||||
* Windows icon by: [FroyoShark](https://www.iconarchive.com/artist/froyoshark.html)
|
||||
license: [Creative Commons Attribution 4.0 International](https://creativecommons.org/licenses/by/4.0/)
|
||||
Source: [icon archive: Steam Icon](https://www.iconarchive.com/show/enkel-icons-by-froyoshark/Steam-icon.html)
|
|
@ -1,224 +0,0 @@
|
|||
#controller vdf script by mr_goldberg
|
||||
#generates controller config from a vdf
|
||||
import vdf
|
||||
import sys
|
||||
import os
|
||||
import traceback
|
||||
|
||||
keymap_digital = {
|
||||
"button_a": "A",
|
||||
"button_b": "B",
|
||||
"button_x": "X",
|
||||
"button_y": "Y",
|
||||
"dpad_north": "DUP",
|
||||
"dpad_south": "DDOWN",
|
||||
"dpad_east": "DRIGHT",
|
||||
"dpad_west": "DLEFT",
|
||||
"button_escape": "START",
|
||||
"button_menu": "BACK",
|
||||
"left_bumper": "LBUMPER",
|
||||
"right_bumper": "RBUMPER",
|
||||
"button_back_left": "A",
|
||||
"button_back_right": "X",
|
||||
"button_back_left_upper": "B",
|
||||
"button_back_right_upper": "Y",
|
||||
"": "",
|
||||
"": "",
|
||||
"": "",
|
||||
"": "",
|
||||
}
|
||||
|
||||
def add_input_bindings(group, bindings, force_binding=None, keymap=keymap_digital):
|
||||
if "inputs" not in group:
|
||||
return bindings
|
||||
for i, i_val in group["inputs"].iteritems():
|
||||
for act in i_val.itervalues():
|
||||
for fp in act.itervalues():
|
||||
for bd in fp.itervalues():
|
||||
for bbd, ss in bd.iteritems():
|
||||
if bbd.lower() == 'binding':
|
||||
st = ss.split()
|
||||
supported_binding = False
|
||||
if st[0].lower() == 'game_action':
|
||||
supported_binding = True
|
||||
if st[2][-1] == ",":
|
||||
action_name = st[2][:-1]
|
||||
else:
|
||||
action_name = st[2][:]
|
||||
elif st[0].lower() == 'xinput_button':
|
||||
supported_binding = True
|
||||
if st[1][-1] == ",":
|
||||
action_name = st[1][:-1]
|
||||
else:
|
||||
action_name = st[1][:]
|
||||
|
||||
if supported_binding:
|
||||
if force_binding is None:
|
||||
binding = keymap.get(i.lower(), None)
|
||||
else:
|
||||
binding = force_binding
|
||||
|
||||
if binding:
|
||||
if action_name in bindings:
|
||||
if binding not in bindings[action_name]:
|
||||
bindings[action_name].append(binding)
|
||||
else:
|
||||
bindings[action_name] = [binding]
|
||||
else:
|
||||
print(f"[X] missing keymap for {i}")
|
||||
return bindings
|
||||
|
||||
|
||||
def generate_controller_config(controller_vdf, config_dir):
|
||||
d = vdf.loads(controller_vdf, mapper=vdf.VDFDict, merge_duplicate_keys=False)
|
||||
|
||||
controller_mappings = d["controller_mappings"]
|
||||
|
||||
groups = controller_mappings.get_all_for("group")
|
||||
groups_byid = {}
|
||||
for g in groups:
|
||||
groups_byid[g["id"]] = g
|
||||
|
||||
actions = controller_mappings.get_all_for("actions")
|
||||
action_list = []
|
||||
for a in actions:
|
||||
for k in a:
|
||||
action_list.append(k)
|
||||
|
||||
presets = controller_mappings.get_all_for("preset")
|
||||
all_bindings = {}
|
||||
for p in presets:
|
||||
name = p["name"]
|
||||
if (name not in action_list) and name.lower() != 'default':
|
||||
continue
|
||||
group_bindings = p["group_source_bindings"]
|
||||
bindings = {}
|
||||
for number in group_bindings:
|
||||
s = group_bindings[number].split()
|
||||
if s[1].lower() != "active":
|
||||
continue
|
||||
|
||||
#print(s)
|
||||
if s[0].lower() in ["switch", "button_diamond", "dpad"]:
|
||||
group = groups_byid[number]
|
||||
#print(group)
|
||||
bindings = add_input_bindings(group, bindings)
|
||||
|
||||
if s[0].lower() in ["left_trigger", "right_trigger"]:
|
||||
group = groups_byid[number]
|
||||
if group["mode"].lower() == "trigger":
|
||||
for g in group:
|
||||
if g.lower() == "gameactions":
|
||||
#print(group)
|
||||
action_name = group["gameactions"][name]
|
||||
if s[0].lower() == "left_trigger":
|
||||
binding = "LTRIGGER"
|
||||
else:
|
||||
binding = "RTRIGGER"
|
||||
if action_name in bindings:
|
||||
if binding not in bindings[action_name] and (binding + "=trigger") not in bindings[action_name]:
|
||||
bindings[action_name].insert(0, binding)
|
||||
else:
|
||||
bindings[action_name] = [binding + "=trigger"]
|
||||
if g.lower() == "inputs":
|
||||
if s[0].lower() == "left_trigger":
|
||||
binding = "DLTRIGGER"
|
||||
else:
|
||||
binding = "DRTRIGGER"
|
||||
bindings = add_input_bindings(group, bindings, binding)
|
||||
|
||||
else:
|
||||
print("unhandled trigger mode", group["mode"])
|
||||
|
||||
if s[0].lower() in ["joystick", "right_joystick", "dpad"]:
|
||||
group = groups_byid[number]
|
||||
if group["mode"].lower() == "joystick_move":
|
||||
for g in group:
|
||||
if g.lower() == "gameactions":
|
||||
#print(group)
|
||||
action_name = group["gameactions"][name]
|
||||
if s[0].lower() == "joystick":
|
||||
binding = "LJOY"
|
||||
elif s[0].lower() == "right_joystick":
|
||||
binding = "RJOY"
|
||||
elif s[0].lower() == "dpad":
|
||||
binding = "DPAD"
|
||||
else:
|
||||
print("could not handle", s[0])
|
||||
if action_name in bindings:
|
||||
if binding not in bindings[action_name] and (binding + "=joystick_move") not in bindings[action_name]:
|
||||
bindings[action_name].insert(0, binding)
|
||||
else:
|
||||
bindings[action_name] = [binding + "=joystick_move"]
|
||||
if g.lower() == "inputs":
|
||||
if s[0].lower() == "joystick":
|
||||
binding = "LSTICK"
|
||||
else:
|
||||
binding = "RSTICK"
|
||||
bindings = add_input_bindings(group, bindings, binding)
|
||||
|
||||
elif group["mode"].lower() == "dpad":
|
||||
if s[0].lower() == "joystick":
|
||||
binding_map = {"dpad_north":"DLJOYUP", "dpad_south": "DLJOYDOWN", "dpad_west": "DLJOYLEFT", "dpad_east": "DLJOYRIGHT", "click": "LSTICK"}
|
||||
bindings = add_input_bindings(group, bindings, keymap=binding_map)
|
||||
elif s[0].lower() == "right_joystick":
|
||||
binding_map = {"dpad_north":"DRJOYUP", "dpad_south": "DRJOYDOWN", "dpad_west": "DRJOYLEFT", "dpad_east": "DRJOYRIGHT", "click": "RSTICK"}
|
||||
bindings = add_input_bindings(group, bindings, keymap=binding_map)
|
||||
else:
|
||||
if s[0].lower() != "dpad":
|
||||
print("no pad", s[0])
|
||||
else:
|
||||
print("unhandled joy mode", group["mode"])
|
||||
|
||||
all_bindings[name] = bindings
|
||||
|
||||
#print(controller_mappings["preset"][(0, "group_source_bindings")])
|
||||
|
||||
#print(all_bindings)
|
||||
|
||||
if all_bindings:
|
||||
if not os.path.exists(config_dir):
|
||||
os.makedirs(config_dir)
|
||||
|
||||
for k in all_bindings:
|
||||
with open(os.path.join(config_dir, f'{k}.txt'), 'w', encoding='utf-8') as f:
|
||||
for b in all_bindings[k]:
|
||||
f.write(f"{b}=" + ','.join(all_bindings[k][b]) + "\n")
|
||||
|
||||
|
||||
def help():
|
||||
exe_name = os.path.basename(sys.argv[0])
|
||||
print(f"\nUsage: {exe_name} xbox_controller_config.vdf [xbox360_controller_config.vdf] ... ")
|
||||
print(f" Example: {exe_name} xbox_controller_config.vdf")
|
||||
print(f" Example: {exe_name} xboxone_controller.vdf xbox360_controller.vdf")
|
||||
print("\nAt least 1 .vdf file must be provided\n")
|
||||
|
||||
if __name__ == '__main__':
|
||||
if len(sys.argv) < 2:
|
||||
help()
|
||||
sys.exit(1)
|
||||
|
||||
for vdf_file in sys.argv[1:]:
|
||||
try:
|
||||
print(f"parsing controller file '{vdf_file}'")
|
||||
t = ''
|
||||
with open(vdf_file, 'rb') as f:
|
||||
t = f.read().decode('utf-8')
|
||||
if t:
|
||||
filename = os.path.basename(vdf_file)
|
||||
outdir = os.path.join(f"{filename}_config", "steam_settings", "controller")
|
||||
print(f"output dir: '{outdir}'")
|
||||
generate_controller_config(t, outdir)
|
||||
else:
|
||||
print("[X] couldn't load file", file=sys.stderr)
|
||||
|
||||
print('**********************************\n')
|
||||
except Exception as e:
|
||||
print("Unexpected error:")
|
||||
print(e)
|
||||
print("-----------------------")
|
||||
for line in traceback.format_exception(e):
|
||||
print(line)
|
||||
print('xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\n')
|
||||
|
||||
sys.exit(0)
|
|
@ -1,521 +0,0 @@
|
|||
"""
|
||||
Module for deserializing/serializing to and from VDF
|
||||
"""
|
||||
__version__ = "3.4"
|
||||
__author__ = "Rossen Georgiev"
|
||||
|
||||
import re
|
||||
import sys
|
||||
import struct
|
||||
from binascii import crc32
|
||||
from io import BytesIO
|
||||
from io import StringIO as unicodeIO
|
||||
|
||||
try:
|
||||
from collections.abc import Mapping
|
||||
except:
|
||||
from collections import Mapping
|
||||
|
||||
from vdf.vdict import VDFDict
|
||||
|
||||
# Py2 & Py3 compatibility
|
||||
if sys.version_info[0] >= 3:
|
||||
string_type = str
|
||||
int_type = int
|
||||
BOMS = '\ufffe\ufeff'
|
||||
|
||||
def strip_bom(line):
|
||||
return line.lstrip(BOMS)
|
||||
else:
|
||||
from StringIO import StringIO as strIO
|
||||
string_type = basestring
|
||||
int_type = long
|
||||
BOMS = '\xef\xbb\xbf\xff\xfe\xfe\xff'
|
||||
BOMS_UNICODE = '\\ufffe\\ufeff'.decode('unicode-escape')
|
||||
|
||||
def strip_bom(line):
|
||||
return line.lstrip(BOMS if isinstance(line, str) else BOMS_UNICODE)
|
||||
|
||||
# string escaping
|
||||
_unescape_char_map = {
|
||||
r"\n": "\n",
|
||||
r"\t": "\t",
|
||||
r"\v": "\v",
|
||||
r"\b": "\b",
|
||||
r"\r": "\r",
|
||||
r"\f": "\f",
|
||||
r"\a": "\a",
|
||||
r"\\": "\\",
|
||||
r"\?": "?",
|
||||
r"\"": "\"",
|
||||
r"\'": "\'",
|
||||
}
|
||||
_escape_char_map = {v: k for k, v in _unescape_char_map.items()}
|
||||
|
||||
def _re_escape_match(m):
|
||||
return _escape_char_map[m.group()]
|
||||
|
||||
def _re_unescape_match(m):
|
||||
return _unescape_char_map[m.group()]
|
||||
|
||||
def _escape(text):
|
||||
return re.sub(r"[\n\t\v\b\r\f\a\\\?\"']", _re_escape_match, text)
|
||||
|
||||
def _unescape(text):
|
||||
return re.sub(r"(\\n|\\t|\\v|\\b|\\r|\\f|\\a|\\\\|\\\?|\\\"|\\')", _re_unescape_match, text)
|
||||
|
||||
# parsing and dumping for KV1
|
||||
def parse(fp, mapper=dict, merge_duplicate_keys=True, escaped=True):
|
||||
"""
|
||||
Deserialize ``s`` (a ``str`` or ``unicode`` instance containing a VDF)
|
||||
to a Python object.
|
||||
|
||||
``mapper`` specifies the Python object used after deserializetion. ``dict` is
|
||||
used by default. Alternatively, ``collections.OrderedDict`` can be used if you
|
||||
wish to preserve key order. Or any object that acts like a ``dict``.
|
||||
|
||||
``merge_duplicate_keys`` when ``True`` will merge multiple KeyValue lists with the
|
||||
same key into one instead of overwriting. You can se this to ``False`` if you are
|
||||
using ``VDFDict`` and need to preserve the duplicates.
|
||||
"""
|
||||
if not issubclass(mapper, Mapping):
|
||||
raise TypeError("Expected mapper to be subclass of dict, got %s" % type(mapper))
|
||||
if not hasattr(fp, 'readline'):
|
||||
raise TypeError("Expected fp to be a file-like object supporting line iteration")
|
||||
|
||||
stack = [mapper()]
|
||||
expect_bracket = False
|
||||
|
||||
re_keyvalue = re.compile(r'^("(?P<qkey>(?:\\.|[^\\"])*)"|(?P<key>#?[a-z0-9\-\_\\\?$%<>]+))'
|
||||
r'([ \t]*('
|
||||
r'"(?P<qval>(?:\\.|[^\\"])*)(?P<vq_end>")?'
|
||||
r'|(?P<val>(?:(?<!/)/(?!/)|[a-z0-9\-\_\\\?\*\.$<> ])+)'
|
||||
r'|(?P<sblock>{[ \t]*)(?P<eblock>})?'
|
||||
r'))?',
|
||||
flags=re.I)
|
||||
|
||||
for lineno, line in enumerate(fp, 1):
|
||||
if lineno == 1:
|
||||
line = strip_bom(line)
|
||||
|
||||
line = line.lstrip()
|
||||
|
||||
# skip empty and comment lines
|
||||
if line == "" or line[0] == '/':
|
||||
continue
|
||||
|
||||
# one level deeper
|
||||
if line[0] == "{":
|
||||
expect_bracket = False
|
||||
continue
|
||||
|
||||
if expect_bracket:
|
||||
raise SyntaxError("vdf.parse: expected openning bracket",
|
||||
(getattr(fp, 'name', '<%s>' % fp.__class__.__name__), lineno, 1, line))
|
||||
|
||||
# one level back
|
||||
if line[0] == "}":
|
||||
if len(stack) > 1:
|
||||
stack.pop()
|
||||
continue
|
||||
|
||||
raise SyntaxError("vdf.parse: one too many closing parenthasis",
|
||||
(getattr(fp, 'name', '<%s>' % fp.__class__.__name__), lineno, 0, line))
|
||||
|
||||
# parse keyvalue pairs
|
||||
while True:
|
||||
match = re_keyvalue.match(line)
|
||||
|
||||
if not match:
|
||||
try:
|
||||
line += next(fp)
|
||||
continue
|
||||
except StopIteration:
|
||||
raise SyntaxError("vdf.parse: unexpected EOF (open key quote?)",
|
||||
(getattr(fp, 'name', '<%s>' % fp.__class__.__name__), lineno, 0, line))
|
||||
|
||||
key = match.group('key') if match.group('qkey') is None else match.group('qkey')
|
||||
val = match.group('qval')
|
||||
if val is None:
|
||||
val = match.group('val')
|
||||
if val is not None:
|
||||
val = val.rstrip()
|
||||
if val == "":
|
||||
val = None
|
||||
|
||||
if escaped:
|
||||
key = _unescape(key)
|
||||
|
||||
# we have a key with value in parenthesis, so we make a new dict obj (level deeper)
|
||||
if val is None:
|
||||
if merge_duplicate_keys and key in stack[-1]:
|
||||
_m = stack[-1][key]
|
||||
# we've descended a level deeper, if value is str, we have to overwrite it to mapper
|
||||
if not isinstance(_m, mapper):
|
||||
_m = stack[-1][key] = mapper()
|
||||
else:
|
||||
_m = mapper()
|
||||
stack[-1][key] = _m
|
||||
|
||||
if match.group('eblock') is None:
|
||||
# only expect a bracket if it's not already closed or on the same line
|
||||
stack.append(_m)
|
||||
if match.group('sblock') is None:
|
||||
expect_bracket = True
|
||||
|
||||
# we've matched a simple keyvalue pair, map it to the last dict obj in the stack
|
||||
else:
|
||||
# if the value is line consume one more line and try to match again,
|
||||
# until we get the KeyValue pair
|
||||
if match.group('vq_end') is None and match.group('qval') is not None:
|
||||
try:
|
||||
line += next(fp)
|
||||
continue
|
||||
except StopIteration:
|
||||
raise SyntaxError("vdf.parse: unexpected EOF (open quote for value?)",
|
||||
(getattr(fp, 'name', '<%s>' % fp.__class__.__name__), lineno, 0, line))
|
||||
|
||||
stack[-1][key] = _unescape(val) if escaped else val
|
||||
|
||||
# exit the loop
|
||||
break
|
||||
|
||||
if len(stack) != 1:
|
||||
raise SyntaxError("vdf.parse: unclosed parenthasis or quotes (EOF)",
|
||||
(getattr(fp, 'name', '<%s>' % fp.__class__.__name__), lineno, 0, line))
|
||||
|
||||
return stack.pop()
|
||||
|
||||
|
||||
def loads(s, **kwargs):
|
||||
"""
|
||||
Deserialize ``s`` (a ``str`` or ``unicode`` instance containing a JSON
|
||||
document) to a Python object.
|
||||
"""
|
||||
if not isinstance(s, string_type):
|
||||
raise TypeError("Expected s to be a str, got %s" % type(s))
|
||||
|
||||
try:
|
||||
fp = unicodeIO(s)
|
||||
except TypeError:
|
||||
fp = strIO(s)
|
||||
|
||||
return parse(fp, **kwargs)
|
||||
|
||||
|
||||
def load(fp, **kwargs):
|
||||
"""
|
||||
Deserialize ``fp`` (a ``.readline()``-supporting file-like object containing
|
||||
a JSON document) to a Python object.
|
||||
"""
|
||||
return parse(fp, **kwargs)
|
||||
|
||||
|
||||
def dumps(obj, pretty=False, escaped=True):
|
||||
"""
|
||||
Serialize ``obj`` to a VDF formatted ``str``.
|
||||
"""
|
||||
if not isinstance(obj, Mapping):
|
||||
raise TypeError("Expected data to be an instance of``dict``")
|
||||
if not isinstance(pretty, bool):
|
||||
raise TypeError("Expected pretty to be of type bool")
|
||||
if not isinstance(escaped, bool):
|
||||
raise TypeError("Expected escaped to be of type bool")
|
||||
|
||||
return ''.join(_dump_gen(obj, pretty, escaped))
|
||||
|
||||
|
||||
def dump(obj, fp, pretty=False, escaped=True):
|
||||
"""
|
||||
Serialize ``obj`` as a VDF formatted stream to ``fp`` (a
|
||||
``.write()``-supporting file-like object).
|
||||
"""
|
||||
if not isinstance(obj, Mapping):
|
||||
raise TypeError("Expected data to be an instance of``dict``")
|
||||
if not hasattr(fp, 'write'):
|
||||
raise TypeError("Expected fp to have write() method")
|
||||
if not isinstance(pretty, bool):
|
||||
raise TypeError("Expected pretty to be of type bool")
|
||||
if not isinstance(escaped, bool):
|
||||
raise TypeError("Expected escaped to be of type bool")
|
||||
|
||||
for chunk in _dump_gen(obj, pretty, escaped):
|
||||
fp.write(chunk)
|
||||
|
||||
|
||||
def _dump_gen(data, pretty=False, escaped=True, level=0):
|
||||
indent = "\t"
|
||||
line_indent = ""
|
||||
|
||||
if pretty:
|
||||
line_indent = indent * level
|
||||
|
||||
for key, value in data.items():
|
||||
if escaped and isinstance(key, string_type):
|
||||
key = _escape(key)
|
||||
|
||||
if isinstance(value, Mapping):
|
||||
yield '%s"%s"\n%s{\n' % (line_indent, key, line_indent)
|
||||
for chunk in _dump_gen(value, pretty, escaped, level+1):
|
||||
yield chunk
|
||||
yield "%s}\n" % line_indent
|
||||
else:
|
||||
if escaped and isinstance(value, string_type):
|
||||
value = _escape(value)
|
||||
|
||||
yield '%s"%s" "%s"\n' % (line_indent, key, value)
|
||||
|
||||
|
||||
# binary VDF
|
||||
class BASE_INT(int_type):
|
||||
def __repr__(self):
|
||||
return "%s(%d)" % (self.__class__.__name__, self)
|
||||
|
||||
class UINT_64(BASE_INT):
|
||||
pass
|
||||
|
||||
class INT_64(BASE_INT):
|
||||
pass
|
||||
|
||||
class POINTER(BASE_INT):
|
||||
pass
|
||||
|
||||
class COLOR(BASE_INT):
|
||||
pass
|
||||
|
||||
BIN_NONE = b'\x00'
|
||||
BIN_STRING = b'\x01'
|
||||
BIN_INT32 = b'\x02'
|
||||
BIN_FLOAT32 = b'\x03'
|
||||
BIN_POINTER = b'\x04'
|
||||
BIN_WIDESTRING = b'\x05'
|
||||
BIN_COLOR = b'\x06'
|
||||
BIN_UINT64 = b'\x07'
|
||||
BIN_END = b'\x08'
|
||||
BIN_INT64 = b'\x0A'
|
||||
BIN_END_ALT = b'\x0B'
|
||||
|
||||
def binary_loads(b, mapper=dict, merge_duplicate_keys=True, alt_format=False, raise_on_remaining=True):
|
||||
"""
|
||||
Deserialize ``b`` (``bytes`` containing a VDF in "binary form")
|
||||
to a Python object.
|
||||
|
||||
``mapper`` specifies the Python object used after deserializetion. ``dict` is
|
||||
used by default. Alternatively, ``collections.OrderedDict`` can be used if you
|
||||
wish to preserve key order. Or any object that acts like a ``dict``.
|
||||
|
||||
``merge_duplicate_keys`` when ``True`` will merge multiple KeyValue lists with the
|
||||
same key into one instead of overwriting. You can se this to ``False`` if you are
|
||||
using ``VDFDict`` and need to preserve the duplicates.
|
||||
"""
|
||||
if not isinstance(b, bytes):
|
||||
raise TypeError("Expected s to be bytes, got %s" % type(b))
|
||||
|
||||
return binary_load(BytesIO(b), mapper, merge_duplicate_keys, alt_format, raise_on_remaining)
|
||||
|
||||
def binary_load(fp, mapper=dict, merge_duplicate_keys=True, alt_format=False, raise_on_remaining=False):
|
||||
"""
|
||||
Deserialize ``fp`` (a ``.read()``-supporting file-like object containing
|
||||
binary VDF) to a Python object.
|
||||
|
||||
``mapper`` specifies the Python object used after deserializetion. ``dict` is
|
||||
used by default. Alternatively, ``collections.OrderedDict`` can be used if you
|
||||
wish to preserve key order. Or any object that acts like a ``dict``.
|
||||
|
||||
``merge_duplicate_keys`` when ``True`` will merge multiple KeyValue lists with the
|
||||
same key into one instead of overwriting. You can se this to ``False`` if you are
|
||||
using ``VDFDict`` and need to preserve the duplicates.
|
||||
"""
|
||||
if not hasattr(fp, 'read') or not hasattr(fp, 'tell') or not hasattr(fp, 'seek'):
|
||||
raise TypeError("Expected fp to be a file-like object with tell()/seek() and read() returning bytes")
|
||||
if not issubclass(mapper, Mapping):
|
||||
raise TypeError("Expected mapper to be subclass of dict, got %s" % type(mapper))
|
||||
|
||||
# helpers
|
||||
int32 = struct.Struct('<i')
|
||||
uint64 = struct.Struct('<Q')
|
||||
int64 = struct.Struct('<q')
|
||||
float32 = struct.Struct('<f')
|
||||
|
||||
def read_string(fp, wide=False):
|
||||
buf, end = b'', -1
|
||||
offset = fp.tell()
|
||||
|
||||
# locate string end
|
||||
while end == -1:
|
||||
chunk = fp.read(64)
|
||||
|
||||
if chunk == b'':
|
||||
raise SyntaxError("Unterminated cstring (offset: %d)" % offset)
|
||||
|
||||
buf += chunk
|
||||
end = buf.find(b'\x00\x00' if wide else b'\x00')
|
||||
|
||||
if wide:
|
||||
end += end % 2
|
||||
|
||||
# rewind fp
|
||||
fp.seek(end - len(buf) + (2 if wide else 1), 1)
|
||||
|
||||
# decode string
|
||||
result = buf[:end]
|
||||
|
||||
if wide:
|
||||
result = result.decode('utf-16')
|
||||
elif bytes is not str:
|
||||
result = result.decode('utf-8', 'replace')
|
||||
else:
|
||||
try:
|
||||
result.decode('ascii')
|
||||
except:
|
||||
result = result.decode('utf-8', 'replace')
|
||||
|
||||
return result
|
||||
|
||||
stack = [mapper()]
|
||||
CURRENT_BIN_END = BIN_END if not alt_format else BIN_END_ALT
|
||||
|
||||
for t in iter(lambda: fp.read(1), b''):
|
||||
if t == CURRENT_BIN_END:
|
||||
if len(stack) > 1:
|
||||
stack.pop()
|
||||
continue
|
||||
break
|
||||
|
||||
key = read_string(fp)
|
||||
|
||||
if t == BIN_NONE:
|
||||
if merge_duplicate_keys and key in stack[-1]:
|
||||
_m = stack[-1][key]
|
||||
else:
|
||||
_m = mapper()
|
||||
stack[-1][key] = _m
|
||||
stack.append(_m)
|
||||
elif t == BIN_STRING:
|
||||
stack[-1][key] = read_string(fp)
|
||||
elif t == BIN_WIDESTRING:
|
||||
stack[-1][key] = read_string(fp, wide=True)
|
||||
elif t in (BIN_INT32, BIN_POINTER, BIN_COLOR):
|
||||
val = int32.unpack(fp.read(int32.size))[0]
|
||||
|
||||
if t == BIN_POINTER:
|
||||
val = POINTER(val)
|
||||
elif t == BIN_COLOR:
|
||||
val = COLOR(val)
|
||||
|
||||
stack[-1][key] = val
|
||||
elif t == BIN_UINT64:
|
||||
stack[-1][key] = UINT_64(uint64.unpack(fp.read(int64.size))[0])
|
||||
elif t == BIN_INT64:
|
||||
stack[-1][key] = INT_64(int64.unpack(fp.read(int64.size))[0])
|
||||
elif t == BIN_FLOAT32:
|
||||
stack[-1][key] = float32.unpack(fp.read(float32.size))[0]
|
||||
else:
|
||||
raise SyntaxError("Unknown data type at offset %d: %s" % (fp.tell() - 1, repr(t)))
|
||||
|
||||
if len(stack) != 1:
|
||||
raise SyntaxError("Reached EOF, but Binary VDF is incomplete")
|
||||
if raise_on_remaining and fp.read(1) != b'':
|
||||
fp.seek(-1, 1)
|
||||
raise SyntaxError("Binary VDF ended at offset %d, but there is more data remaining" % (fp.tell() - 1))
|
||||
|
||||
return stack.pop()
|
||||
|
||||
def binary_dumps(obj, alt_format=False):
|
||||
"""
|
||||
Serialize ``obj`` to a binary VDF formatted ``bytes``.
|
||||
"""
|
||||
buf = BytesIO()
|
||||
binary_dump(obj, buf, alt_format)
|
||||
return buf.getvalue()
|
||||
|
||||
def binary_dump(obj, fp, alt_format=False):
|
||||
"""
|
||||
Serialize ``obj`` to a binary VDF formatted ``bytes`` and write it to ``fp`` filelike object
|
||||
"""
|
||||
if not isinstance(obj, Mapping):
|
||||
raise TypeError("Expected obj to be type of Mapping")
|
||||
if not hasattr(fp, 'write'):
|
||||
raise TypeError("Expected fp to have write() method")
|
||||
|
||||
for chunk in _binary_dump_gen(obj, alt_format=alt_format):
|
||||
fp.write(chunk)
|
||||
|
||||
def _binary_dump_gen(obj, level=0, alt_format=False):
|
||||
if level == 0 and len(obj) == 0:
|
||||
return
|
||||
|
||||
int32 = struct.Struct('<i')
|
||||
uint64 = struct.Struct('<Q')
|
||||
int64 = struct.Struct('<q')
|
||||
float32 = struct.Struct('<f')
|
||||
|
||||
for key, value in obj.items():
|
||||
if isinstance(key, string_type):
|
||||
key = key.encode('utf-8')
|
||||
else:
|
||||
raise TypeError("dict keys must be of type str, got %s" % type(key))
|
||||
|
||||
if isinstance(value, Mapping):
|
||||
yield BIN_NONE + key + BIN_NONE
|
||||
for chunk in _binary_dump_gen(value, level+1, alt_format=alt_format):
|
||||
yield chunk
|
||||
elif isinstance(value, UINT_64):
|
||||
yield BIN_UINT64 + key + BIN_NONE + uint64.pack(value)
|
||||
elif isinstance(value, INT_64):
|
||||
yield BIN_INT64 + key + BIN_NONE + int64.pack(value)
|
||||
elif isinstance(value, string_type):
|
||||
try:
|
||||
value = value.encode('utf-8') + BIN_NONE
|
||||
yield BIN_STRING
|
||||
except:
|
||||
value = value.encode('utf-16') + BIN_NONE*2
|
||||
yield BIN_WIDESTRING
|
||||
yield key + BIN_NONE + value
|
||||
elif isinstance(value, float):
|
||||
yield BIN_FLOAT32 + key + BIN_NONE + float32.pack(value)
|
||||
elif isinstance(value, (COLOR, POINTER, int, int_type)):
|
||||
if isinstance(value, COLOR):
|
||||
yield BIN_COLOR
|
||||
elif isinstance(value, POINTER):
|
||||
yield BIN_POINTER
|
||||
else:
|
||||
yield BIN_INT32
|
||||
yield key + BIN_NONE
|
||||
yield int32.pack(value)
|
||||
else:
|
||||
raise TypeError("Unsupported type: %s" % type(value))
|
||||
|
||||
yield BIN_END if not alt_format else BIN_END_ALT
|
||||
|
||||
|
||||
def vbkv_loads(s, mapper=dict, merge_duplicate_keys=True):
|
||||
"""
|
||||
Deserialize ``s`` (``bytes`` containing a VBKV to a Python object.
|
||||
|
||||
``mapper`` specifies the Python object used after deserializetion. ``dict` is
|
||||
used by default. Alternatively, ``collections.OrderedDict`` can be used if you
|
||||
wish to preserve key order. Or any object that acts like a ``dict``.
|
||||
|
||||
``merge_duplicate_keys`` when ``True`` will merge multiple KeyValue lists with the
|
||||
same key into one instead of overwriting. You can se this to ``False`` if you are
|
||||
using ``VDFDict`` and need to preserve the duplicates.
|
||||
"""
|
||||
if s[:4] != b'VBKV':
|
||||
raise ValueError("Invalid header")
|
||||
|
||||
checksum, = struct.unpack('<i', s[4:8])
|
||||
|
||||
if checksum != crc32(s[8:]):
|
||||
raise ValueError("Invalid checksum")
|
||||
|
||||
return binary_loads(s[8:], mapper, merge_duplicate_keys, alt_format=True)
|
||||
|
||||
def vbkv_dumps(obj):
|
||||
"""
|
||||
Serialize ``obj`` to a VBKV formatted ``bytes``.
|
||||
"""
|
||||
data = b''.join(_binary_dump_gen(obj, alt_format=True))
|
||||
checksum = crc32(data)
|
||||
|
||||
return b'VBKV' + struct.pack('<i', checksum) + data
|
|
@ -1,221 +0,0 @@
|
|||
import sys
|
||||
from collections import Counter
|
||||
|
||||
if sys.version_info[0] >= 3:
|
||||
_iter_values = 'values'
|
||||
_range = range
|
||||
_string_type = str
|
||||
import collections.abc as _c
|
||||
class _kView(_c.KeysView):
|
||||
def __iter__(self):
|
||||
return self._mapping.iterkeys()
|
||||
class _vView(_c.ValuesView):
|
||||
def __iter__(self):
|
||||
return self._mapping.itervalues()
|
||||
class _iView(_c.ItemsView):
|
||||
def __iter__(self):
|
||||
return self._mapping.iteritems()
|
||||
else:
|
||||
_iter_values = 'itervalues'
|
||||
_range = xrange
|
||||
_string_type = basestring
|
||||
_kView = lambda x: list(x.iterkeys())
|
||||
_vView = lambda x: list(x.itervalues())
|
||||
_iView = lambda x: list(x.iteritems())
|
||||
|
||||
|
||||
class VDFDict(dict):
|
||||
def __init__(self, data=None):
|
||||
"""
|
||||
This is a dictionary that supports duplicate keys and preserves insert order
|
||||
|
||||
``data`` can be a ``dict``, or a sequence of key-value tuples. (e.g. ``[('key', 'value'),..]``)
|
||||
The only supported type for key is str.
|
||||
|
||||
Get/set duplicates is done by tuples ``(index, key)``, where index is the duplicate index
|
||||
for the specified key. (e.g. ``(0, 'key')``, ``(1, 'key')``...)
|
||||
|
||||
When the ``key`` is ``str``, instead of tuple, set will create a duplicate and get will look up ``(0, key)``
|
||||
"""
|
||||
self.__omap = []
|
||||
self.__kcount = Counter()
|
||||
|
||||
if data is not None:
|
||||
if not isinstance(data, (list, dict)):
|
||||
raise ValueError("Expected data to be list of pairs or dict, got %s" % type(data))
|
||||
self.update(data)
|
||||
|
||||
def __repr__(self):
|
||||
out = "%s(" % self.__class__.__name__
|
||||
out += "%s)" % repr(list(self.iteritems()))
|
||||
return out
|
||||
|
||||
def __len__(self):
|
||||
return len(self.__omap)
|
||||
|
||||
def _verify_key_tuple(self, key):
|
||||
if len(key) != 2:
|
||||
raise ValueError("Expected key tuple length to be 2, got %d" % len(key))
|
||||
if not isinstance(key[0], int):
|
||||
raise TypeError("Key index should be an int")
|
||||
if not isinstance(key[1], _string_type):
|
||||
raise TypeError("Key value should be a str")
|
||||
|
||||
def _normalize_key(self, key):
|
||||
if isinstance(key, _string_type):
|
||||
key = (0, key)
|
||||
elif isinstance(key, tuple):
|
||||
self._verify_key_tuple(key)
|
||||
else:
|
||||
raise TypeError("Expected key to be a str or tuple, got %s" % type(key))
|
||||
return key
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
if isinstance(key, _string_type):
|
||||
key = (self.__kcount[key], key)
|
||||
self.__omap.append(key)
|
||||
elif isinstance(key, tuple):
|
||||
self._verify_key_tuple(key)
|
||||
if key not in self:
|
||||
raise KeyError("%s doesn't exist" % repr(key))
|
||||
else:
|
||||
raise TypeError("Expected either a str or tuple for key")
|
||||
super(VDFDict, self).__setitem__(key, value)
|
||||
self.__kcount[key[1]] += 1
|
||||
|
||||
def __getitem__(self, key):
|
||||
return super(VDFDict, self).__getitem__(self._normalize_key(key))
|
||||
|
||||
def __delitem__(self, key):
|
||||
key = self._normalize_key(key)
|
||||
result = super(VDFDict, self).__delitem__(key)
|
||||
|
||||
start_idx = self.__omap.index(key)
|
||||
del self.__omap[start_idx]
|
||||
|
||||
dup_idx, skey = key
|
||||
self.__kcount[skey] -= 1
|
||||
tail_count = self.__kcount[skey] - dup_idx
|
||||
|
||||
if tail_count > 0:
|
||||
for idx in _range(start_idx, len(self.__omap)):
|
||||
if self.__omap[idx][1] == skey:
|
||||
oldkey = self.__omap[idx]
|
||||
newkey = (dup_idx, skey)
|
||||
super(VDFDict, self).__setitem__(newkey, self[oldkey])
|
||||
super(VDFDict, self).__delitem__(oldkey)
|
||||
self.__omap[idx] = newkey
|
||||
|
||||
dup_idx += 1
|
||||
tail_count -= 1
|
||||
if tail_count == 0:
|
||||
break
|
||||
|
||||
if self.__kcount[skey] == 0:
|
||||
del self.__kcount[skey]
|
||||
|
||||
return result
|
||||
|
||||
def __iter__(self):
|
||||
return iter(self.iterkeys())
|
||||
|
||||
def __contains__(self, key):
|
||||
return super(VDFDict, self).__contains__(self._normalize_key(key))
|
||||
|
||||
def __eq__(self, other):
|
||||
if isinstance(other, VDFDict):
|
||||
return list(self.items()) == list(other.items())
|
||||
else:
|
||||
return False
|
||||
|
||||
def __ne__(self, other):
|
||||
return not self.__eq__(other)
|
||||
|
||||
def clear(self):
|
||||
super(VDFDict, self).clear()
|
||||
self.__kcount.clear()
|
||||
self.__omap = list()
|
||||
|
||||
def get(self, key, *args):
|
||||
return super(VDFDict, self).get(self._normalize_key(key), *args)
|
||||
|
||||
def setdefault(self, key, default=None):
|
||||
if key not in self:
|
||||
self.__setitem__(key, default)
|
||||
return self.__getitem__(key)
|
||||
|
||||
def pop(self, key):
|
||||
key = self._normalize_key(key)
|
||||
value = self.__getitem__(key)
|
||||
self.__delitem__(key)
|
||||
return value
|
||||
|
||||
def popitem(self):
|
||||
if not self.__omap:
|
||||
raise KeyError("VDFDict is empty")
|
||||
key = self.__omap[-1]
|
||||
return key[1], self.pop(key)
|
||||
|
||||
def update(self, data=None, **kwargs):
|
||||
if isinstance(data, dict):
|
||||
data = data.items()
|
||||
elif not isinstance(data, list):
|
||||
raise TypeError("Expected data to be a list or dict, got %s" % type(data))
|
||||
|
||||
for key, value in data:
|
||||
self.__setitem__(key, value)
|
||||
|
||||
def iterkeys(self):
|
||||
return (key[1] for key in self.__omap)
|
||||
|
||||
def keys(self):
|
||||
return _kView(self)
|
||||
|
||||
def itervalues(self):
|
||||
return (self[key] for key in self.__omap)
|
||||
|
||||
def values(self):
|
||||
return _vView(self)
|
||||
|
||||
def iteritems(self):
|
||||
return ((key[1], self[key]) for key in self.__omap)
|
||||
|
||||
def items(self):
|
||||
return _iView(self)
|
||||
|
||||
def get_all_for(self, key):
|
||||
""" Returns all values of the given key """
|
||||
if not isinstance(key, _string_type):
|
||||
raise TypeError("Key needs to be a string.")
|
||||
return [self[(idx, key)] for idx in _range(self.__kcount[key])]
|
||||
|
||||
def remove_all_for(self, key):
|
||||
""" Removes all items with the given key """
|
||||
if not isinstance(key, _string_type):
|
||||
raise TypeError("Key need to be a string.")
|
||||
|
||||
for idx in _range(self.__kcount[key]):
|
||||
super(VDFDict, self).__delitem__((idx, key))
|
||||
|
||||
self.__omap = list(filter(lambda x: x[1] != key, self.__omap))
|
||||
|
||||
del self.__kcount[key]
|
||||
|
||||
def has_duplicates(self):
|
||||
"""
|
||||
Returns ``True`` if the dict contains keys with duplicates.
|
||||
Recurses through any all keys with value that is ``VDFDict``.
|
||||
"""
|
||||
for n in getattr(self.__kcount, _iter_values)():
|
||||
if n != 1:
|
||||
return True
|
||||
|
||||
def dict_recurse(obj):
|
||||
for v in getattr(obj, _iter_values)():
|
||||
if isinstance(v, VDFDict) and v.has_duplicates():
|
||||
return True
|
||||
elif isinstance(v, dict):
|
||||
return dict_recurse(v)
|
||||
return False
|
||||
|
||||
return dict_recurse(self)
|
|
@ -1,173 +0,0 @@
|
|||
import copy
|
||||
import os
|
||||
import time
|
||||
import json
|
||||
|
||||
def __ClosestDictKey(targetKey : str, srcDict : dict[str, object] | set[str]) -> str | None:
|
||||
for k in srcDict:
|
||||
if k.lower() == f"{targetKey}".lower():
|
||||
return k
|
||||
|
||||
return None
|
||||
|
||||
def __generate_ach_watcher_schema(lang: str, app_id: int, achs: list[dict]) -> list[dict]:
|
||||
out_achs_list = []
|
||||
for idx in range(len(achs)):
|
||||
ach = copy.deepcopy(achs[idx])
|
||||
out_ach_data = {}
|
||||
|
||||
# adjust the displayName
|
||||
displayName = ""
|
||||
ach_displayName = ach.get("displayName", None)
|
||||
if ach_displayName:
|
||||
if type(ach_displayName) == dict: # this is a dictionary
|
||||
displayName : str = ach_displayName.get(lang, "")
|
||||
if not displayName and ach_displayName: # has some keys but language not found
|
||||
#print(f'[?] Missing language "{lang}" in "displayName" of achievement {ach["name"]}')
|
||||
nearestLang = __ClosestDictKey(lang, ach_displayName)
|
||||
if nearestLang:
|
||||
#print(f'[?] Best matching language "{nearestLang}"')
|
||||
displayName = ach_displayName[nearestLang]
|
||||
else:
|
||||
print(f'[?] Missing language "{lang}", using displayName from the first language for achievement {ach["name"]}')
|
||||
displayName : str = list(ach_displayName.values())[0]
|
||||
else: # single string (or anything else)
|
||||
displayName = ach_displayName
|
||||
|
||||
del ach["displayName"]
|
||||
else:
|
||||
print(f'[?] Missing "displayName" in achievement {ach["name"]}')
|
||||
|
||||
out_ach_data["displayName"] = displayName
|
||||
|
||||
desc = ""
|
||||
ach_desc = ach.get("description", None)
|
||||
if ach_desc:
|
||||
if type(ach_desc) == dict: # this is a dictionary
|
||||
desc : str = ach_desc.get(lang, "")
|
||||
if not desc and ach_desc: # has some keys but language not found
|
||||
#print(f'[?] Missing language "{lang}" in "description" of achievement {ach["name"]}')
|
||||
nearestLang = __ClosestDictKey(lang, ach_desc)
|
||||
if nearestLang:
|
||||
#print(f'[?] Best matching language "{nearestLang}"')
|
||||
desc = ach_desc[nearestLang]
|
||||
else:
|
||||
print(f'[?] Missing language "{lang}", using description from the first language for achievement {ach["name"]}')
|
||||
desc : str = list(ach_desc.values())[0]
|
||||
else: # single string (or anything else)
|
||||
desc = ach_desc
|
||||
|
||||
del ach["description"]
|
||||
else:
|
||||
print(f'[?] Missing "description" in achievement {ach["name"]}')
|
||||
|
||||
# adjust the description
|
||||
out_ach_data["description"] = desc
|
||||
|
||||
# copy the rest of the data
|
||||
out_ach_data.update(ach)
|
||||
|
||||
# add links to icon, icongray, and icon_gray
|
||||
base_icon_url = r'https://cdn.cloudflare.steamstatic.com/steamcommunity/public/images/apps'
|
||||
icon_hash = out_ach_data.get("icon", None)
|
||||
if icon_hash:
|
||||
out_ach_data["icon"] = f'{base_icon_url}/{app_id}/{icon_hash}'
|
||||
else:
|
||||
out_ach_data["icon"] = ""
|
||||
|
||||
icongray_hash = out_ach_data.get("icongray", None)
|
||||
if icongray_hash:
|
||||
out_ach_data["icongray"] = f'{base_icon_url}/{app_id}/{icongray_hash}'
|
||||
else:
|
||||
out_ach_data["icongray"] = ""
|
||||
|
||||
icon_gray_hash = out_ach_data.get("icon_gray", None)
|
||||
if icon_gray_hash:
|
||||
del out_ach_data["icon_gray"] # use the old key
|
||||
out_ach_data["icongray"] = f'{base_icon_url}/{app_id}/{icon_gray_hash}'
|
||||
|
||||
if "hidden" in out_ach_data:
|
||||
try:
|
||||
out_ach_data["hidden"] = int(out_ach_data["hidden"])
|
||||
except Exception as e:
|
||||
pass
|
||||
else:
|
||||
out_ach_data["hidden"] = 0
|
||||
|
||||
out_achs_list.append(out_ach_data)
|
||||
|
||||
return out_achs_list
|
||||
|
||||
def generate_all_ach_watcher_schemas(
|
||||
base_out_dir : str,
|
||||
appid: int,
|
||||
app_name : str,
|
||||
app_exe : str,
|
||||
achs: list[dict],
|
||||
small_icon_hash : str) -> None:
|
||||
|
||||
ach_watcher_out_dir = os.path.join(base_out_dir, "Achievement Watcher", "steam_cache", "schema")
|
||||
print(f"generating schemas for Achievement Watcher in: {ach_watcher_out_dir}")
|
||||
|
||||
if app_exe:
|
||||
print(f"detected app exe: '{app_exe}'")
|
||||
else:
|
||||
print(f"[X] couldn't detect app exe")
|
||||
|
||||
# if not achs:
|
||||
# print("[X] No achievements were found for Achievement Watcher")
|
||||
# return
|
||||
|
||||
small_icon_url = ''
|
||||
if small_icon_hash:
|
||||
small_icon_url = f"https://cdn.cloudflare.steamstatic.com/steamcommunity/public/images/apps/{appid}/{small_icon_hash}.jpg"
|
||||
images_base_url = r'https://cdn.cloudflare.steamstatic.com/steam/apps'
|
||||
ach_watcher_base_schema = {
|
||||
"appid": appid,
|
||||
"name": app_name,
|
||||
"binary": app_exe,
|
||||
"achievement": {
|
||||
"total": len(achs),
|
||||
},
|
||||
"img": {
|
||||
"header": f"{images_base_url}/{appid}/header.jpg",
|
||||
"background": f"{images_base_url}/{appid}/page_bg_generated_v6b.jpg",
|
||||
"portrait": f"{images_base_url}/{appid}/library_600x900.jpg",
|
||||
"hero": f"{images_base_url}/{appid}/library_hero.jpg",
|
||||
"icon": small_icon_url,
|
||||
},
|
||||
"apiVersion": 1,
|
||||
}
|
||||
|
||||
langs : set[str] = set()
|
||||
for ach in achs:
|
||||
displayNameLangs = ach.get("displayName", None)
|
||||
if displayNameLangs and type(displayNameLangs) == dict:
|
||||
langs.update(list(displayNameLangs.keys()))
|
||||
|
||||
descriptionLangs = ach.get("description", None)
|
||||
if descriptionLangs and type(descriptionLangs) == dict:
|
||||
langs.update(list(descriptionLangs.keys()))
|
||||
|
||||
if "token" in langs:
|
||||
langs.remove("token")
|
||||
|
||||
tokenKey = __ClosestDictKey("token", langs)
|
||||
if tokenKey:
|
||||
langs.remove(tokenKey)
|
||||
|
||||
if not langs:
|
||||
print("[X] Couldn't detect supported languages, assuming English is the only supported language for Achievement Watcher")
|
||||
langs = ["english"]
|
||||
|
||||
for lang in langs:
|
||||
out_schema_folder = os.path.join(ach_watcher_out_dir, lang)
|
||||
if not os.path.exists(out_schema_folder):
|
||||
os.makedirs(out_schema_folder)
|
||||
time.sleep(0.050)
|
||||
|
||||
out_schema = copy.copy(ach_watcher_base_schema)
|
||||
out_schema["achievement"]["list"] = __generate_ach_watcher_schema(lang, appid, achs)
|
||||
out_schema_file = os.path.join(out_schema_folder, f'{appid}.db')
|
||||
with open(out_schema_file, "wt", encoding='utf-8') as f:
|
||||
json.dump(out_schema, f, ensure_ascii=False, indent=2)
|
|
@ -1,234 +0,0 @@
|
|||
import os
|
||||
import sys
|
||||
import traceback
|
||||
import json
|
||||
import queue
|
||||
import threading
|
||||
import time
|
||||
import requests
|
||||
import urllib.parse
|
||||
from external_components import (
|
||||
safe_name
|
||||
)
|
||||
|
||||
def __downloader_thread(q : queue.Queue[tuple[str, str]]):
|
||||
while True:
|
||||
url, path = q.get()
|
||||
if not url:
|
||||
q.task_done()
|
||||
return
|
||||
|
||||
# try 3 times
|
||||
for download_trial in range(3):
|
||||
try:
|
||||
r = requests.get(url)
|
||||
r.raise_for_status()
|
||||
if r.status_code == requests.codes.ok: # if download was successfull
|
||||
with open(path, "wb") as f:
|
||||
f.write(r.content)
|
||||
|
||||
break
|
||||
except Exception as e:
|
||||
print(f"Error downloading from '{url}'", file=sys.stderr)
|
||||
traceback.print_exception(e, file=sys.stderr)
|
||||
|
||||
time.sleep(0.1)
|
||||
|
||||
q.task_done()
|
||||
|
||||
def __remove_url_query(url : str) -> str:
|
||||
url_parts = urllib.parse.urlsplit(url)
|
||||
url_parts_list = list(url_parts)
|
||||
url_parts_list[3] = '' # remove query
|
||||
return str(urllib.parse.urlunsplit(url_parts_list))
|
||||
|
||||
def __download_screenshots(
|
||||
base_out_dir : str,
|
||||
appid : int,
|
||||
app_details : dict,
|
||||
download_screenshots : bool,
|
||||
download_thumbnails : bool):
|
||||
if not download_screenshots and not download_thumbnails:
|
||||
return
|
||||
|
||||
screenshots : list[dict[str, object]] = app_details.get(f'{appid}', {}).get('data', {}).get('screenshots', [])
|
||||
if not screenshots:
|
||||
print(f'[?] no screenshots or thumbnails are available')
|
||||
return
|
||||
|
||||
screenshots_out_dir = os.path.join(base_out_dir, "screenshots")
|
||||
if download_screenshots:
|
||||
print(f"downloading screenshots in: {screenshots_out_dir}")
|
||||
if not os.path.exists(screenshots_out_dir):
|
||||
os.makedirs(screenshots_out_dir)
|
||||
time.sleep(0.025)
|
||||
|
||||
thumbnails_out_dir = os.path.join(screenshots_out_dir, "thumbnails")
|
||||
if download_thumbnails:
|
||||
print(f"downloading screenshots thumbnails in: {thumbnails_out_dir}")
|
||||
if not os.path.exists(thumbnails_out_dir):
|
||||
os.makedirs(thumbnails_out_dir)
|
||||
time.sleep(0.025)
|
||||
|
||||
q : queue.Queue[tuple[str, str]] = queue.Queue()
|
||||
|
||||
max_threads = 20
|
||||
for i in range(max_threads):
|
||||
threading.Thread(target=__downloader_thread, args=(q,), daemon=True).start()
|
||||
|
||||
for scrn in screenshots:
|
||||
if download_screenshots:
|
||||
full_image_url = scrn.get('path_full', None)
|
||||
if full_image_url:
|
||||
full_image_url_sanitized = __remove_url_query(full_image_url)
|
||||
image_hash_name = f'{full_image_url_sanitized.rsplit("/", 1)[-1]}'.rstrip()
|
||||
if image_hash_name:
|
||||
q.put((full_image_url_sanitized, os.path.join(screenshots_out_dir, image_hash_name)))
|
||||
else:
|
||||
print(f'[X] cannot download screenshot from url: "{full_image_url}", failed to get image name')
|
||||
|
||||
if download_thumbnails:
|
||||
thumbnail_url = scrn.get('path_thumbnail', None)
|
||||
if thumbnail_url:
|
||||
thumbnail_url_sanitized = __remove_url_query(thumbnail_url)
|
||||
image_hash_name = f'{thumbnail_url_sanitized.rsplit("/", 1)[-1]}'.rstrip()
|
||||
if image_hash_name:
|
||||
q.put((thumbnail_url_sanitized, os.path.join(thumbnails_out_dir, image_hash_name)))
|
||||
else:
|
||||
print(f'[X] cannot download screenshot thumbnail from url: "{thumbnail_url}", failed to get image name')
|
||||
|
||||
q.join()
|
||||
|
||||
for i in range(max_threads):
|
||||
q.put((None, None))
|
||||
|
||||
q.join()
|
||||
|
||||
print(f"finished downloading app screenshots")
|
||||
|
||||
PREFERED_VIDS = [
|
||||
'trailer', 'gameplay', 'announcement'
|
||||
]
|
||||
|
||||
def __download_videos(base_out_dir : str, appid : int, app_details : dict):
|
||||
videos : list[dict[str, object]] = app_details.get(f'{appid}', {}).get('data', {}).get('movies', [])
|
||||
if not videos:
|
||||
print(f'[?] no videos were found')
|
||||
return
|
||||
|
||||
videos_out_dir = os.path.join(base_out_dir, "videos")
|
||||
print(f"downloading app videos in: {videos_out_dir}")
|
||||
|
||||
first_vid : tuple[str, str] = None
|
||||
prefered_vid : tuple[str, str] = None
|
||||
for vid in videos:
|
||||
vid_name = f"{vid.get('name', '')}"
|
||||
webm_url = vid.get('webm', {}).get("480", None)
|
||||
mp4_url = vid.get('mp4', {}).get("480", None)
|
||||
|
||||
ext : str = None
|
||||
prefered_url : str = None
|
||||
if mp4_url:
|
||||
prefered_url = mp4_url
|
||||
ext = 'mp4'
|
||||
elif webm_url:
|
||||
prefered_url = webm_url
|
||||
ext = 'webm'
|
||||
else: # no url is found
|
||||
print(f'[X] no url is found for video "{vid_name}"')
|
||||
continue
|
||||
|
||||
vid_url_sanitized = __remove_url_query(prefered_url)
|
||||
vid_name_in_url = f'{vid_url_sanitized.rsplit("/", 1)[-1]}'.rstrip()
|
||||
vid_name = safe_name.create_safe_name(vid_name)
|
||||
if vid_name:
|
||||
vid_name = f'{vid_name}.{ext}'
|
||||
else:
|
||||
vid_name = vid_name_in_url
|
||||
|
||||
if vid_name:
|
||||
if not first_vid:
|
||||
first_vid = (vid_url_sanitized, vid_name)
|
||||
|
||||
if any(vid_name.lower().find(candidate) > -1 for candidate in PREFERED_VIDS):
|
||||
prefered_vid = (vid_url_sanitized, vid_name)
|
||||
|
||||
if prefered_vid:
|
||||
break
|
||||
else:
|
||||
print(f'[X] cannot download video from url: "{prefered_url}", failed to get vido name')
|
||||
|
||||
if not first_vid and not prefered_vid:
|
||||
print(f'[X] no video url could be found')
|
||||
return
|
||||
elif not prefered_vid:
|
||||
prefered_vid = first_vid
|
||||
|
||||
if not os.path.exists(videos_out_dir):
|
||||
os.makedirs(videos_out_dir)
|
||||
time.sleep(0.05)
|
||||
|
||||
q : queue.Queue[tuple[str, str]] = queue.Queue()
|
||||
|
||||
max_threads = 1
|
||||
for i in range(max_threads):
|
||||
threading.Thread(target=__downloader_thread, args=(q,), daemon=True).start()
|
||||
|
||||
# TODO download all videos
|
||||
print(f'donwloading video: "{prefered_vid[1]}"')
|
||||
q.put((prefered_vid[0], os.path.join(videos_out_dir, prefered_vid[1])))
|
||||
q.join()
|
||||
|
||||
for i in range(max_threads):
|
||||
q.put((None, None))
|
||||
|
||||
q.join()
|
||||
|
||||
print(f"finished downloading app videos")
|
||||
|
||||
|
||||
def download_app_details(
|
||||
base_out_dir : str,
|
||||
info_out_dir : str,
|
||||
appid : int,
|
||||
download_screenshots : bool,
|
||||
download_thumbnails : bool,
|
||||
download_vids : bool):
|
||||
|
||||
details_out_file = os.path.join(info_out_dir, "app_details.json")
|
||||
print(f"downloading app details in: {details_out_file}")
|
||||
|
||||
app_details : dict = None
|
||||
last_exception : Exception | str = None
|
||||
# try 3 times
|
||||
for download_trial in range(3):
|
||||
try:
|
||||
r = requests.get(f'http://store.steampowered.com/api/appdetails?appids={appid}&format=json')
|
||||
if r.status_code == requests.codes.ok: # if download was successfull
|
||||
result : dict = r.json()
|
||||
json_ok = result.get(f'{appid}', {}).get('success', False)
|
||||
if json_ok:
|
||||
app_details = result
|
||||
break
|
||||
else:
|
||||
last_exception = "JSON success was False"
|
||||
except Exception as e:
|
||||
last_exception = e
|
||||
|
||||
time.sleep(0.1)
|
||||
|
||||
if not app_details:
|
||||
err = "[X] failed to download app details"
|
||||
if last_exception:
|
||||
err += f', last error: "{last_exception}"'
|
||||
|
||||
print(err)
|
||||
return
|
||||
|
||||
with open(details_out_file, "wt", encoding='utf-8') as f:
|
||||
json.dump(app_details, f, ensure_ascii=False, indent=2)
|
||||
|
||||
__download_screenshots(base_out_dir, appid, app_details, download_screenshots, download_thumbnails)
|
||||
|
||||
if download_vids:
|
||||
__download_videos(base_out_dir, appid, app_details)
|
|
@ -1,94 +0,0 @@
|
|||
import os
|
||||
import threading
|
||||
import time
|
||||
import requests
|
||||
|
||||
|
||||
def download_app_images(
|
||||
base_out_dir : str,
|
||||
appid : int,
|
||||
clienticon : str,
|
||||
icon : str,
|
||||
logo : str,
|
||||
logo_small : str):
|
||||
|
||||
icons_out_dir = os.path.join(base_out_dir, "images")
|
||||
print(f"downloading common app images in: {icons_out_dir}")
|
||||
|
||||
def downloader_thread(image_name : str, image_url : str):
|
||||
# try 3 times
|
||||
for download_trial in range(3):
|
||||
try:
|
||||
r = requests.get(image_url)
|
||||
if r.status_code == requests.codes.ok: # if download was successfull
|
||||
with open(os.path.join(icons_out_dir, image_name), "wb") as f:
|
||||
f.write(r.content)
|
||||
|
||||
break
|
||||
except Exception as ex:
|
||||
pass
|
||||
|
||||
time.sleep(0.1)
|
||||
|
||||
app_images_names = [
|
||||
r'capsule_184x69.jpg',
|
||||
r'capsule_231x87.jpg',
|
||||
r'capsule_231x87_alt_assets_0.jpg',
|
||||
r'capsule_467x181.jpg',
|
||||
r'capsule_616x353.jpg',
|
||||
r'capsule_616x353_alt_assets_0.jpg',
|
||||
r'library_600x900.jpg',
|
||||
r'library_600x900_2x.jpg',
|
||||
r'library_hero.jpg',
|
||||
r'broadcast_left_panel.jpg',
|
||||
r'broadcast_right_panel.jpg',
|
||||
r'page.bg.jpg',
|
||||
r'page_bg_raw.jpg',
|
||||
r'page_bg_generated.jpg',
|
||||
r'page_bg_generated_v6b.jpg',
|
||||
r'header.jpg',
|
||||
r'header_alt_assets_0.jpg',
|
||||
r'hero_capsule.jpg',
|
||||
r'logo.png',
|
||||
]
|
||||
|
||||
if not os.path.exists(icons_out_dir):
|
||||
os.makedirs(icons_out_dir)
|
||||
time.sleep(0.050)
|
||||
|
||||
threads_list : list[threading.Thread] = []
|
||||
for image_name in app_images_names:
|
||||
image_url = f'https://cdn.cloudflare.steamstatic.com/steam/apps/{appid}/{image_name}'
|
||||
t = threading.Thread(target=downloader_thread, args=(image_name, image_url), daemon=True)
|
||||
threads_list.append(t)
|
||||
t.start()
|
||||
|
||||
community_images_url = f'https://cdn.cloudflare.steamstatic.com/steamcommunity/public/images/apps/{appid}'
|
||||
if clienticon:
|
||||
image_url = f'{community_images_url}/{clienticon}.ico'
|
||||
t = threading.Thread(target=downloader_thread, args=('clienticon.ico', image_url), daemon=True)
|
||||
threads_list.append(t)
|
||||
t.start()
|
||||
|
||||
if icon:
|
||||
image_url = f'{community_images_url}/{icon}.jpg'
|
||||
t = threading.Thread(target=downloader_thread, args=('icon.jpg', image_url), daemon=True)
|
||||
threads_list.append(t)
|
||||
t.start()
|
||||
|
||||
if logo:
|
||||
image_url = f'{community_images_url}/{logo}.jpg'
|
||||
t = threading.Thread(target=downloader_thread, args=('logo.jpg', image_url), daemon=True)
|
||||
threads_list.append(t)
|
||||
t.start()
|
||||
|
||||
if logo_small:
|
||||
image_url = f'{community_images_url}/{logo_small}.jpg'
|
||||
t = threading.Thread(target=downloader_thread, args=('logo_small.jpg', image_url), daemon=True)
|
||||
threads_list.append(t)
|
||||
t.start()
|
||||
|
||||
for t in threads_list:
|
||||
t.join()
|
||||
|
||||
print(f"finished downloading common app images")
|
|
@ -1,157 +0,0 @@
|
|||
import os
|
||||
|
||||
__cdx_ini = '''
|
||||
### мллллл м
|
||||
### Алллл плл лВ ппплллллллм пппппллВллм мВлллп
|
||||
### Блллп Бллп ппллллА пллл Блллп
|
||||
### Вллл п ллВ плллБ АллВллл
|
||||
### Вллл млллллм ллл пллл мллллллм Бллллл
|
||||
### лллА Аллллп плВ ллл лллВллВ Алл лллВллл
|
||||
### Бллл ллллА лл ллл Алллллллллллп лллБ Бллл
|
||||
### Алллм мллпВллм млл Влл лллБлллА млллА Алллм
|
||||
### плллллп плллВп ллп АлллА плллллллВлп пВллм
|
||||
### мллллллБ
|
||||
### пппллВмммммлВлллВпп
|
||||
###
|
||||
###
|
||||
### Game data is stored at %SystemDrive%\\Users\\Public\\Documents\\Steam\\CODEX\\{cdx_id}
|
||||
###
|
||||
|
||||
[Settings]
|
||||
###
|
||||
### Game identifier (http://store.steampowered.com/app/{cdx_id})
|
||||
###
|
||||
AppId={cdx_id}
|
||||
###
|
||||
### Steam Account ID, set it to 0 to get a random Account ID
|
||||
###
|
||||
#AccountId=0
|
||||
###
|
||||
### Name of the current player
|
||||
###
|
||||
UserName=Player2
|
||||
###
|
||||
### Language that will be used in the game
|
||||
###
|
||||
Language=english
|
||||
###
|
||||
### Enable lobby mode
|
||||
###
|
||||
LobbyEnabled=1
|
||||
###
|
||||
### Lobby port to listen on
|
||||
###
|
||||
#LobbyPort=31183
|
||||
###
|
||||
### Enable/Disable Steam overlay
|
||||
###
|
||||
Overlays=1
|
||||
###
|
||||
### Set Steam connection to offline mode
|
||||
###
|
||||
Offline=0
|
||||
###
|
||||
|
||||
[Interfaces]
|
||||
###
|
||||
### Steam Client API interface versions
|
||||
###
|
||||
SteamAppList=STEAMAPPLIST_INTERFACE_VERSION001
|
||||
SteamApps=STEAMAPPS_INTERFACE_VERSION008
|
||||
SteamClient=SteamClient017
|
||||
SteamController=SteamController008
|
||||
SteamFriends=SteamFriends017
|
||||
SteamGameServer=SteamGameServer013
|
||||
SteamGameServerStats=SteamGameServerStats001
|
||||
SteamHTMLSurface=STEAMHTMLSURFACE_INTERFACE_VERSION_005
|
||||
SteamHTTP=STEAMHTTP_INTERFACE_VERSION003
|
||||
SteamInput=SteamInput002
|
||||
SteamInventory=STEAMINVENTORY_INTERFACE_V003
|
||||
SteamMatchGameSearch=SteamMatchGameSearch001
|
||||
SteamMatchMaking=SteamMatchMaking009
|
||||
SteamMatchMakingServers=SteamMatchMakingServers002
|
||||
SteamMusic=STEAMMUSIC_INTERFACE_VERSION001
|
||||
SteamMusicRemote=STEAMMUSICREMOTE_INTERFACE_VERSION001
|
||||
SteamNetworking=SteamNetworking006
|
||||
SteamNetworkingSockets=SteamNetworkingSockets008
|
||||
SteamNetworkingUtils=SteamNetworkingUtils003
|
||||
SteamParentalSettings=STEAMPARENTALSETTINGS_INTERFACE_VERSION001
|
||||
SteamParties=SteamParties002
|
||||
SteamRemotePlay=STEAMREMOTEPLAY_INTERFACE_VERSION001
|
||||
SteamRemoteStorage=STEAMREMOTESTORAGE_INTERFACE_VERSION014
|
||||
SteamScreenshots=STEAMSCREENSHOTS_INTERFACE_VERSION003
|
||||
SteamTV=STEAMTV_INTERFACE_V001
|
||||
SteamUGC=STEAMUGC_INTERFACE_VERSION015
|
||||
SteamUser=SteamUser021
|
||||
SteamUserStats=STEAMUSERSTATS_INTERFACE_VERSION012
|
||||
SteamUtils=SteamUtils010
|
||||
SteamVideo=STEAMVIDEO_INTERFACE_V002
|
||||
###
|
||||
|
||||
[DLC]
|
||||
###
|
||||
### Automatically unlock all DLCs
|
||||
###
|
||||
DLCUnlockall=0
|
||||
###
|
||||
### Identifiers for DLCs
|
||||
###
|
||||
#ID=Name
|
||||
{cdx_dlc_list}
|
||||
###
|
||||
|
||||
[AchievementIcons]
|
||||
###
|
||||
### Bitmap Icons for Achievements
|
||||
###
|
||||
#halloween_8 Achieved=steam_settings\\img\\halloween_8.jpg
|
||||
#halloween_8 Unachieved=steam_settings\\img\\unachieved\\halloween_8.jpg
|
||||
{cdx_ach_list}
|
||||
###
|
||||
|
||||
[Crack]
|
||||
00ec7837693245e3=b7d5bc716512b5d6
|
||||
|
||||
'''
|
||||
|
||||
|
||||
def generate_cdx_ini(
|
||||
base_out_dir : str,
|
||||
appid: int,
|
||||
dlc: list[tuple[int, str]],
|
||||
achs: list[dict]) -> None:
|
||||
|
||||
cdx_ini_path = os.path.join(base_out_dir, "steam_emu.ini")
|
||||
print(f"generating steam_emu.ini for CODEX emulator in: {cdx_ini_path}")
|
||||
|
||||
dlc_list = [f"{d[0]}={d[1]}" for d in dlc]
|
||||
achs_list = []
|
||||
for ach in achs:
|
||||
icon = ach.get("icon", None)
|
||||
if icon:
|
||||
icon = f"steam_settings\\img\\{icon}"
|
||||
else:
|
||||
icon = 'steam_settings\\img\\steam_default_icon_unlocked.jpg'
|
||||
|
||||
icon_gray = ach.get("icon_gray", None)
|
||||
if icon_gray:
|
||||
icon_gray = f"steam_settings\\img\\{icon_gray}"
|
||||
else:
|
||||
icon_gray = 'steam_settings\\img\\steam_default_icon_locked.jpg'
|
||||
|
||||
icongray = ach.get("icongray", None)
|
||||
if icongray:
|
||||
icon_gray = f"steam_settings\\img\\{icongray}"
|
||||
|
||||
achs_list.append(f'{ach["name"]} Achieved={icon}') # unlocked
|
||||
achs_list.append(f'{ach["name"]} Unachieved={icon_gray}') # locked
|
||||
|
||||
formatted_ini = __cdx_ini.format(
|
||||
cdx_id = appid,
|
||||
cdx_dlc_list = "\n".join(dlc_list),
|
||||
cdx_ach_list = "\n".join(achs_list)
|
||||
)
|
||||
|
||||
with open(cdx_ini_path, "wt", encoding='utf-8') as f:
|
||||
f.writelines(formatted_ini)
|
||||
|
|
@ -1,22 +0,0 @@
|
|||
|
||||
import re
|
||||
|
||||
|
||||
ALLOWED_CHARS = set([
|
||||
'`', '~', '!', '@',
|
||||
'#', '$', '%', '&',
|
||||
'(', ')', '-', '_',
|
||||
'=', '+', '[', '{',
|
||||
']', '}', ';', '\'',
|
||||
',', '.', ' ', '\t',
|
||||
'®', '™',
|
||||
])
|
||||
|
||||
def create_safe_name(app_name : str):
|
||||
safe_name = ''.join(c for c in f'{app_name}' if c.isalnum() or c in ALLOWED_CHARS)\
|
||||
.rstrip()\
|
||||
.rstrip('.')\
|
||||
.replace('\t', ' ')
|
||||
safe_name = re.sub('\s\s+', ' ', safe_name)
|
||||
return safe_name
|
||||
|
File diff suppressed because it is too large
Load diff
Binary file not shown.
Before Width: | Height: | Size: 178 KiB |
|
@ -1,3 +0,0 @@
|
|||
Icon by: [FroyoShark](https://www.iconarchive.com/artist/froyoshark.html)
|
||||
License: [Creative Commons Attribution 4.0 International](https://creativecommons.org/licenses/by/4.0/)
|
||||
Source: [icon archive: Steam Icon](https://www.iconarchive.com/show/enkel-icons-by-froyoshark/Steam-icon.html)
|
|
@ -1,28 +0,0 @@
|
|||
#!/usr/bin/env bash
|
||||
|
||||
|
||||
if [ "$(id -u)" -ne 0 ]; then
|
||||
echo "Please run as root" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
build_dir="bin/linux"
|
||||
out_dir="bin/package/linux"
|
||||
script_dir=$( cd -- "$( dirname -- "${0}" )" &> /dev/null && pwd )
|
||||
|
||||
[[ -d "$script_dir/$build_dir" ]] || {
|
||||
echo "[X] build folder wasn't found" >&2
|
||||
exit 1
|
||||
}
|
||||
|
||||
apt update || exit 1
|
||||
apt install tar -y || exit 1
|
||||
|
||||
mkdir -p "$script_dir/$out_dir"
|
||||
|
||||
archive_file="$script_dir/$out_dir/generate_emu_config-linux.tar.bz2"
|
||||
[[ -f "$archive_file" ]] && rm -f "$archive_file"
|
||||
|
||||
pushd "$script_dir/$build_dir"
|
||||
tar -c -j -vf "$archive_file" $(ls -d */)
|
||||
popd
|
|
@ -1,52 +0,0 @@
|
|||
@echo off
|
||||
setlocal EnableDelayedExpansion
|
||||
cd /d "%~dp0"
|
||||
|
||||
set /a "MAX_THREADS=2"
|
||||
if defined NUMBER_OF_PROCESSORS (
|
||||
:: use 70%
|
||||
set /a "MAX_THREADS=%NUMBER_OF_PROCESSORS% * 70 / 100"
|
||||
if %MAX_THREADS% lss 1 (
|
||||
set /a "MAX_THREADS=1"
|
||||
)
|
||||
)
|
||||
|
||||
set "ROOT=%cd%"
|
||||
set "BUILD_DIR=%ROOT%\bin\win"
|
||||
set "OUT_DIR=%ROOT%\bin\package\win"
|
||||
|
||||
set /a "PKG_EXE_MEM_PERCENT=90"
|
||||
set /a "PKG_EXE_DICT_SIZE_MB=384"
|
||||
set "PKG_EXE=..\..\third-party\deps\win\7za\7za.exe"
|
||||
if not exist "%PKG_EXE%" (
|
||||
1>&2 echo:packager wasn't found
|
||||
goto :end_script_with_err
|
||||
)
|
||||
|
||||
if not exist "%BUILD_DIR%" (
|
||||
1>&2 echo:build folder wasn't found
|
||||
goto :end_script_with_err
|
||||
)
|
||||
|
||||
if not exist "%OUT_DIR%" (
|
||||
mkdir "%OUT_DIR%"
|
||||
)
|
||||
|
||||
set "ACHIVE_FILE=%OUT_DIR%\generate_emu_config-win.7z"
|
||||
if exist "%ACHIVE_FILE%" (
|
||||
del /f /q "%ACHIVE_FILE%"
|
||||
)
|
||||
|
||||
call "%PKG_EXE%" a "%ACHIVE_FILE%" "%BUILD_DIR%\*" -t7z -slp -ssw -mx -myx -mmemuse=p%PKG_EXE_MEM_PERCENT% -ms=on -mqs=off -mf=on -mhc+ -mhe- -m0=LZMA2:d=%PKG_EXE_DICT_SIZE_MB%m -mmt=%MAX_THREADS% -mmtf+ -mtm- -mtc- -mta- -mtr+ || (
|
||||
goto :end_script_with_err
|
||||
)
|
||||
|
||||
goto :end_script
|
||||
|
||||
:end_script
|
||||
endlocal
|
||||
exit /b 0
|
||||
|
||||
:end_script_with_err
|
||||
endlocal
|
||||
exit /b 1
|
|
@ -1,40 +0,0 @@
|
|||
#!/usr/bin/env bash
|
||||
|
||||
|
||||
venv=".env-linux"
|
||||
out_dir="bin/linux"
|
||||
build_temp_dir="bin/tmp/linux"
|
||||
|
||||
[[ -d "$out_dir" ]] && rm -r -f "$out_dir"
|
||||
mkdir -p "$out_dir"
|
||||
|
||||
[[ -d "$build_temp_dir" ]] && rm -r -f "$build_temp_dir"
|
||||
|
||||
rm -f *.spec
|
||||
|
||||
chmod 777 "./$venv/bin/activate"
|
||||
source "./$venv/bin/activate"
|
||||
|
||||
echo building generate_emu_config...
|
||||
pyinstaller "generate_emu_config.py" --distpath "$out_dir" -y --clean --onedir --name "generate_emu_config" --noupx --console -i "NONE" --collect-submodules "steam" --workpath "$build_temp_dir" --specpath "$build_temp_dir" || exit 1
|
||||
|
||||
echo building parse_controller_vdf...
|
||||
pyinstaller "controller_config_generator/parse_controller_vdf.py" --distpath "$out_dir" -y --clean --onedir --name "parse_controller_vdf" --noupx --console -i "NONE" --workpath "$build_temp_dir" --specpath "$build_temp_dir" || exit 1
|
||||
|
||||
echo building parse_achievements_schema...
|
||||
pyinstaller "stats_schema_achievement_gen/achievements_gen.py" --distpath "$out_dir" -y --clean --onedir --name "parse_achievements_schema" --noupx --console -i "NONE" --workpath "$build_temp_dir" --specpath "$build_temp_dir" || exit 1
|
||||
|
||||
cp -f "steam_default_icon_locked.jpg" "$out_dir/generate_emu_config"
|
||||
cp -f "steam_default_icon_unlocked.jpg" "$out_dir/generate_emu_config"
|
||||
cp -f "README.md" "$out_dir/generate_emu_config"
|
||||
echo "Check the README" > "$out_dir/generate_emu_config/my_login.EXAMPLE.txt"
|
||||
echo "Check the README" > "$out_dir/generate_emu_config/top_owners_ids.EXAMPLE.txt"
|
||||
echo "You can use a website like: https://steamladder.com/games/" >> "$out_dir/generate_emu_config/top_owners_ids.EXAMPLE.txt"
|
||||
|
||||
echo;
|
||||
echo =============
|
||||
echo Built inside: "$out_dir/"
|
||||
|
||||
[[ -d "$build_temp_dir" ]] && rm -r -f "$build_temp_dir"
|
||||
|
||||
deactivate
|
|
@ -1,71 +0,0 @@
|
|||
@echo off
|
||||
setlocal EnableDelayedExpansion
|
||||
cd /d "%~dp0"
|
||||
|
||||
set "ROOT=%cd%"
|
||||
set "VENV=%ROOT%\.env-win"
|
||||
set "OUT_DIR=%ROOT%\bin\win"
|
||||
set "BUILD_TEMP_DIR=%ROOT%\bin\tmp\win"
|
||||
set "ICON_FILE=%ROOT%\icon\Froyoshark-Enkel-Steam.ico"
|
||||
|
||||
set /a "LAST_ERR_CODE=0"
|
||||
|
||||
set "SIGNER_TOOL=..\..\third-party\build\win\cert\sign_helper.bat"
|
||||
if not exist "%SIGNER_TOOL%" (
|
||||
1>&2 echo:signing tool wasn't found
|
||||
set /a "LAST_ERR_CODE=1"
|
||||
goto :end_script
|
||||
)
|
||||
|
||||
if exist "%OUT_DIR%" (
|
||||
rmdir /s /q "%OUT_DIR%"
|
||||
)
|
||||
mkdir "%OUT_DIR%"
|
||||
|
||||
if exist "%BUILD_TEMP_DIR%" (
|
||||
rmdir /s /q "%BUILD_TEMP_DIR%"
|
||||
)
|
||||
|
||||
call "%VENV%\Scripts\activate.bat"
|
||||
|
||||
echo:building generate_emu_config...
|
||||
pyinstaller "generate_emu_config.py" --distpath "%OUT_DIR%" -y --clean --onedir --name "generate_emu_config" --noupx --console -i "%ICON_FILE%" --collect-submodules "steam" --workpath "%BUILD_TEMP_DIR%" --specpath "%BUILD_TEMP_DIR%" || (
|
||||
set /a "LAST_ERR_CODE=1"
|
||||
goto :end_script
|
||||
)
|
||||
call "%SIGNER_TOOL%" "%OUT_DIR%\generate_emu_config\generate_emu_config.exe"
|
||||
|
||||
echo:building parse_controller_vdf...
|
||||
pyinstaller "controller_config_generator\parse_controller_vdf.py" --distpath "%OUT_DIR%" -y --clean --onedir --name "parse_controller_vdf" --noupx --console -i "NONE" --workpath "%BUILD_TEMP_DIR%" --specpath "%BUILD_TEMP_DIR%" || (
|
||||
set /a "LAST_ERR_CODE=1"
|
||||
goto :end_script
|
||||
)
|
||||
call "%SIGNER_TOOL%" "%OUT_DIR%\parse_controller_vdf\parse_controller_vdf.exe"
|
||||
|
||||
echo:building parse_achievements_schema...
|
||||
pyinstaller "stats_schema_achievement_gen\achievements_gen.py" --distpath "%OUT_DIR%" -y --clean --onedir --name "parse_achievements_schema" --noupx --console -i "NONE" --workpath "%BUILD_TEMP_DIR%" --specpath "%BUILD_TEMP_DIR%" || (
|
||||
set /a "LAST_ERR_CODE=1"
|
||||
goto :end_script
|
||||
)
|
||||
call "%SIGNER_TOOL%" "%OUT_DIR%\parse_achievements_schema\parse_achievements_schema.exe"
|
||||
|
||||
copy /y "steam_default_icon_locked.jpg" "%OUT_DIR%\generate_emu_config\"
|
||||
copy /y "steam_default_icon_unlocked.jpg" "%OUT_DIR%\generate_emu_config\"
|
||||
copy /y "README.md" "%OUT_DIR%\generate_emu_config\"
|
||||
echo Check the README>> "%OUT_DIR%\generate_emu_config\my_login.EXAMPLE.txt"
|
||||
echo Check the README>> "%OUT_DIR%\generate_emu_config\top_owners_ids.EXAMPLE.txt"
|
||||
echo You can use a website like: https://steamladder.com/games/>> "%OUT_DIR%\generate_emu_config\top_owners_ids.EXAMPLE.txt"
|
||||
|
||||
echo:
|
||||
echo:=============
|
||||
echo:Built inside: "%OUT_DIR%\"
|
||||
|
||||
goto :end_script
|
||||
|
||||
:end_script
|
||||
if exist "%BUILD_TEMP_DIR%" (
|
||||
rmdir /s /q "%BUILD_TEMP_DIR%"
|
||||
)
|
||||
|
||||
endlocal
|
||||
exit /b %LAST_ERR_CODE%
|
|
@ -1,35 +0,0 @@
|
|||
#!/usr/bin/env bash
|
||||
|
||||
|
||||
if [ "$(id -u)" -ne 0 ]; then
|
||||
echo "Please run as root" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
python_package="python3.12"
|
||||
venv=".env-linux"
|
||||
reqs_file="requirements.txt"
|
||||
script_dir=$( cd -- "$( dirname -- "${0}" )" &> /dev/null && pwd )
|
||||
|
||||
apt update -y || exit 1
|
||||
apt install software-properties-common -y
|
||||
add-apt-repository ppa:deadsnakes/ppa -y
|
||||
apt update -y || exit 1
|
||||
apt install "$python_package" -y || exit 1
|
||||
apt install "$python_package-dev" -y || exit 1
|
||||
apt install "$python_package-venv" -y || exit 1
|
||||
apt install python3-dev -y || exit 1
|
||||
|
||||
[[ -d "$script_dir/$venv" ]] && rm -r -f "$script_dir/$venv"
|
||||
|
||||
$python_package -m venv "$script_dir/$venv" || exit 1
|
||||
sleep 1
|
||||
|
||||
chmod 777 "$script_dir/$venv/bin/activate"
|
||||
source "$script_dir/$venv/bin/activate"
|
||||
|
||||
pip install -r "$script_dir/$reqs_file"
|
||||
exit_code=$?
|
||||
|
||||
deactivate
|
||||
exit $exit_code
|
|
@ -1,29 +0,0 @@
|
|||
@echo off
|
||||
cd /d "%~dp0"
|
||||
|
||||
set "ROOT=%cd%"
|
||||
set "VENV=%ROOT%\.env-win"
|
||||
set "REQS_FILE=%ROOT%\requirements.txt"
|
||||
|
||||
set /a "LAST_ERR_CODE=0"
|
||||
|
||||
if exist "%VENV%" (
|
||||
rmdir /s /q "%VENV%"
|
||||
)
|
||||
|
||||
python -m venv "%VENV%" || (
|
||||
set /a "LAST_ERR_CODE=1"
|
||||
goto :end_script
|
||||
)
|
||||
|
||||
timeout /t 1 /nobreak
|
||||
|
||||
call "%VENV%\Scripts\activate.bat"
|
||||
pip install -r "%REQS_FILE%"
|
||||
set /a "LAST_ERR_CODE=%ERRORLEVEL%"
|
||||
call "%VENV%\Scripts\deactivate.bat"
|
||||
|
||||
goto :end_script
|
||||
|
||||
:end_script
|
||||
exit /b %LAST_ERR_CODE%
|
|
@ -1,4 +0,0 @@
|
|||
steam[client] @ git+https://github.com/detiam/steam_websocket@b8239912e6a190f490aede529c08b5049096bdc8
|
||||
pyinstaller
|
||||
requests
|
||||
certifi
|
|
@ -1,164 +0,0 @@
|
|||
import vdf
|
||||
import sys
|
||||
import os
|
||||
import json
|
||||
import copy
|
||||
import traceback
|
||||
|
||||
|
||||
STAT_TYPE_INT = '1'
|
||||
STAT_TYPE_FLOAT = '2'
|
||||
STAT_TYPE_AVGRATE = '3'
|
||||
STAT_TYPE_BITS = '4'
|
||||
|
||||
def generate_stats_achievements(
|
||||
schema, config_directory
|
||||
) -> tuple[list[dict], list[dict], bool, bool]:
|
||||
schema = vdf.binary_loads(schema)
|
||||
# print(schema)
|
||||
achievements_out : list[dict] = []
|
||||
stats_out : list[dict] = []
|
||||
|
||||
for appid in schema:
|
||||
sch = schema[appid]
|
||||
stat_info = sch['stats']
|
||||
for s in stat_info:
|
||||
stat = stat_info[s]
|
||||
if stat['type'] == STAT_TYPE_BITS:
|
||||
achs = stat['bits']
|
||||
for ach_num in achs:
|
||||
out = {}
|
||||
ach = achs[ach_num]
|
||||
out['hidden'] = 0
|
||||
for x in ach['display']:
|
||||
value = ach['display'][x]
|
||||
if f'{x}'.lower() == 'name':
|
||||
x = 'displayName'
|
||||
elif f'{x}'.lower() == 'desc':
|
||||
x = 'description'
|
||||
elif x == 'Hidden' or f'{x}'.lower() == 'hidden':
|
||||
x = 'hidden'
|
||||
try:
|
||||
value = int(value)
|
||||
except Exception as e:
|
||||
pass
|
||||
out[x] = value
|
||||
out['name'] = ach['name']
|
||||
if 'progress' in ach:
|
||||
out['progress'] = ach['progress']
|
||||
achievements_out += [out]
|
||||
else:
|
||||
out = {}
|
||||
out['default'] = 0
|
||||
out['name'] = stat['name']
|
||||
if 'min' in stat:
|
||||
out['min'] = stat['min']
|
||||
if stat['type'] == STAT_TYPE_INT:
|
||||
out['type'] = 'int'
|
||||
elif stat['type'] == STAT_TYPE_FLOAT:
|
||||
out['type'] = 'float'
|
||||
elif stat['type'] == STAT_TYPE_AVGRATE:
|
||||
out['type'] = 'avgrate'
|
||||
if 'Default' in stat:
|
||||
out['default'] = stat['Default']
|
||||
elif 'default' in stat:
|
||||
out['default'] = stat['default']
|
||||
|
||||
stats_out += [out]
|
||||
#print(stat_info[s])
|
||||
|
||||
copy_default_unlocked_img = False
|
||||
copy_default_locked_img = False
|
||||
output_ach = copy.deepcopy(achievements_out)
|
||||
for out_ach in output_ach:
|
||||
icon = out_ach.get("icon", None)
|
||||
if icon:
|
||||
out_ach["icon"] = f"img/{icon}"
|
||||
else:
|
||||
out_ach["icon"] = r'img/steam_default_icon_unlocked.jpg'
|
||||
copy_default_unlocked_img = True
|
||||
|
||||
icon_gray = out_ach.get("icon_gray", None)
|
||||
if icon_gray:
|
||||
out_ach["icon_gray"] = f"img/{icon_gray}"
|
||||
else:
|
||||
out_ach["icon_gray"] = r'img/steam_default_icon_locked.jpg'
|
||||
copy_default_locked_img = True
|
||||
|
||||
icongray = out_ach.get("icongray", None)
|
||||
if icongray:
|
||||
out_ach["icongray"] = f"{icongray}"
|
||||
|
||||
output_stats : list[str] = []
|
||||
for s in stats_out:
|
||||
default_num = 0
|
||||
if f"{s['type']}".lower() == 'int':
|
||||
try:
|
||||
default_num = int(s['default'])
|
||||
except ValueError:
|
||||
try:
|
||||
default_num = int(float(s['default']))
|
||||
except ValueError:
|
||||
# we set this to min if someone is failed to set to a fucking int value. and after this and still throwing error I gonna throw the dev out of the windows whoever misstyped that!!!
|
||||
# fixes 282800 | STAT_OJ46_C12 (<---THIS ONE)
|
||||
if 'min' in s:
|
||||
default_num = int(s['min'])
|
||||
else:
|
||||
raise ValueError('min not exist in (s) and no way to get the data. please report with the appid')
|
||||
else:
|
||||
default_num = float(s['default'])
|
||||
output_stats.append(f"{s['name']}={s['type']}={default_num}\n")
|
||||
|
||||
# print(output_ach)
|
||||
# print(output_stats)
|
||||
|
||||
if not os.path.exists(config_directory):
|
||||
os.makedirs(config_directory)
|
||||
|
||||
if output_ach:
|
||||
with open(os.path.join(config_directory, "achievements.json"), 'wt', encoding='utf-8') as f:
|
||||
json.dump(output_ach, f, indent=2)
|
||||
|
||||
if output_stats:
|
||||
with open(os.path.join(config_directory, "stats.txt"), 'wt', encoding='utf-8') as f:
|
||||
f.writelines(output_stats)
|
||||
|
||||
return (achievements_out, stats_out,
|
||||
copy_default_unlocked_img, copy_default_locked_img)
|
||||
|
||||
def help():
|
||||
exe_name = os.path.basename(sys.argv[0])
|
||||
print(f"\nUsage: {exe_name} UserGameStatsSchema_480.bin [UserGameStatsSchema_2370.bin] ... ")
|
||||
print(f" Example: {exe_name} UserGameStatsSchema_480.bin")
|
||||
print(f" Example: {exe_name} UserGameStatsSchema_480.bin UserGameStatsSchema_2370.bin")
|
||||
print("\nAt least 1 .bin file must be provided\n")
|
||||
|
||||
if __name__ == '__main__':
|
||||
if len(sys.argv) < 2:
|
||||
help()
|
||||
sys.exit(1)
|
||||
|
||||
for bin_file in sys.argv[1:]:
|
||||
try:
|
||||
print(f"parsing schema file '{bin_file}'")
|
||||
schema: bytes = b''
|
||||
with open(bin_file, 'rb') as f:
|
||||
schema = f.read()
|
||||
if schema:
|
||||
filename = os.path.basename(bin_file)
|
||||
outdir = os.path.join(f"{filename}_output", "steam_settings")
|
||||
print(f"output dir: '{outdir}'")
|
||||
generate_stats_achievements(schema, outdir)
|
||||
else:
|
||||
print("[X] couldn't load file", file=sys.stderr)
|
||||
|
||||
print('**********************************\n')
|
||||
except Exception as e:
|
||||
print("Unexpected error:")
|
||||
print(e)
|
||||
print("-----------------------")
|
||||
for line in traceback.format_exception(e):
|
||||
print(line)
|
||||
print('xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx\n')
|
||||
|
||||
sys.exit(0)
|
|
@ -1,521 +0,0 @@
|
|||
"""
|
||||
Module for deserializing/serializing to and from VDF
|
||||
"""
|
||||
__version__ = "3.4"
|
||||
__author__ = "Rossen Georgiev"
|
||||
|
||||
import re
|
||||
import sys
|
||||
import struct
|
||||
from binascii import crc32
|
||||
from io import BytesIO
|
||||
from io import StringIO as unicodeIO
|
||||
|
||||
try:
|
||||
from collections.abc import Mapping
|
||||
except:
|
||||
from collections import Mapping
|
||||
|
||||
from vdf.vdict import VDFDict
|
||||
|
||||
# Py2 & Py3 compatibility
|
||||
if sys.version_info[0] >= 3:
|
||||
string_type = str
|
||||
int_type = int
|
||||
BOMS = '\ufffe\ufeff'
|
||||
|
||||
def strip_bom(line):
|
||||
return line.lstrip(BOMS)
|
||||
else:
|
||||
from StringIO import StringIO as strIO
|
||||
string_type = basestring
|
||||
int_type = long
|
||||
BOMS = '\xef\xbb\xbf\xff\xfe\xfe\xff'
|
||||
BOMS_UNICODE = '\\ufffe\\ufeff'.decode('unicode-escape')
|
||||
|
||||
def strip_bom(line):
|
||||
return line.lstrip(BOMS if isinstance(line, str) else BOMS_UNICODE)
|
||||
|
||||
# string escaping
|
||||
_unescape_char_map = {
|
||||
r"\n": "\n",
|
||||
r"\t": "\t",
|
||||
r"\v": "\v",
|
||||
r"\b": "\b",
|
||||
r"\r": "\r",
|
||||
r"\f": "\f",
|
||||
r"\a": "\a",
|
||||
r"\\": "\\",
|
||||
r"\?": "?",
|
||||
r"\"": "\"",
|
||||
r"\'": "\'",
|
||||
}
|
||||
_escape_char_map = {v: k for k, v in _unescape_char_map.items()}
|
||||
|
||||
def _re_escape_match(m):
|
||||
return _escape_char_map[m.group()]
|
||||
|
||||
def _re_unescape_match(m):
|
||||
return _unescape_char_map[m.group()]
|
||||
|
||||
def _escape(text):
|
||||
return re.sub(r"[\n\t\v\b\r\f\a\\\?\"']", _re_escape_match, text)
|
||||
|
||||
def _unescape(text):
|
||||
return re.sub(r"(\\n|\\t|\\v|\\b|\\r|\\f|\\a|\\\\|\\\?|\\\"|\\')", _re_unescape_match, text)
|
||||
|
||||
# parsing and dumping for KV1
|
||||
def parse(fp, mapper=dict, merge_duplicate_keys=True, escaped=True):
|
||||
"""
|
||||
Deserialize ``s`` (a ``str`` or ``unicode`` instance containing a VDF)
|
||||
to a Python object.
|
||||
|
||||
``mapper`` specifies the Python object used after deserializetion. ``dict` is
|
||||
used by default. Alternatively, ``collections.OrderedDict`` can be used if you
|
||||
wish to preserve key order. Or any object that acts like a ``dict``.
|
||||
|
||||
``merge_duplicate_keys`` when ``True`` will merge multiple KeyValue lists with the
|
||||
same key into one instead of overwriting. You can se this to ``False`` if you are
|
||||
using ``VDFDict`` and need to preserve the duplicates.
|
||||
"""
|
||||
if not issubclass(mapper, Mapping):
|
||||
raise TypeError("Expected mapper to be subclass of dict, got %s" % type(mapper))
|
||||
if not hasattr(fp, 'readline'):
|
||||
raise TypeError("Expected fp to be a file-like object supporting line iteration")
|
||||
|
||||
stack = [mapper()]
|
||||
expect_bracket = False
|
||||
|
||||
re_keyvalue = re.compile(r'^("(?P<qkey>(?:\\.|[^\\"])*)"|(?P<key>#?[a-z0-9\-\_\\\?$%<>]+))'
|
||||
r'([ \t]*('
|
||||
r'"(?P<qval>(?:\\.|[^\\"])*)(?P<vq_end>")?'
|
||||
r'|(?P<val>(?:(?<!/)/(?!/)|[a-z0-9\-\_\\\?\*\.$<> ])+)'
|
||||
r'|(?P<sblock>{[ \t]*)(?P<eblock>})?'
|
||||
r'))?',
|
||||
flags=re.I)
|
||||
|
||||
for lineno, line in enumerate(fp, 1):
|
||||
if lineno == 1:
|
||||
line = strip_bom(line)
|
||||
|
||||
line = line.lstrip()
|
||||
|
||||
# skip empty and comment lines
|
||||
if line == "" or line[0] == '/':
|
||||
continue
|
||||
|
||||
# one level deeper
|
||||
if line[0] == "{":
|
||||
expect_bracket = False
|
||||
continue
|
||||
|
||||
if expect_bracket:
|
||||
raise SyntaxError("vdf.parse: expected openning bracket",
|
||||
(getattr(fp, 'name', '<%s>' % fp.__class__.__name__), lineno, 1, line))
|
||||
|
||||
# one level back
|
||||
if line[0] == "}":
|
||||
if len(stack) > 1:
|
||||
stack.pop()
|
||||
continue
|
||||
|
||||
raise SyntaxError("vdf.parse: one too many closing parenthasis",
|
||||
(getattr(fp, 'name', '<%s>' % fp.__class__.__name__), lineno, 0, line))
|
||||
|
||||
# parse keyvalue pairs
|
||||
while True:
|
||||
match = re_keyvalue.match(line)
|
||||
|
||||
if not match:
|
||||
try:
|
||||
line += next(fp)
|
||||
continue
|
||||
except StopIteration:
|
||||
raise SyntaxError("vdf.parse: unexpected EOF (open key quote?)",
|
||||
(getattr(fp, 'name', '<%s>' % fp.__class__.__name__), lineno, 0, line))
|
||||
|
||||
key = match.group('key') if match.group('qkey') is None else match.group('qkey')
|
||||
val = match.group('qval')
|
||||
if val is None:
|
||||
val = match.group('val')
|
||||
if val is not None:
|
||||
val = val.rstrip()
|
||||
if val == "":
|
||||
val = None
|
||||
|
||||
if escaped:
|
||||
key = _unescape(key)
|
||||
|
||||
# we have a key with value in parenthesis, so we make a new dict obj (level deeper)
|
||||
if val is None:
|
||||
if merge_duplicate_keys and key in stack[-1]:
|
||||
_m = stack[-1][key]
|
||||
# we've descended a level deeper, if value is str, we have to overwrite it to mapper
|
||||
if not isinstance(_m, mapper):
|
||||
_m = stack[-1][key] = mapper()
|
||||
else:
|
||||
_m = mapper()
|
||||
stack[-1][key] = _m
|
||||
|
||||
if match.group('eblock') is None:
|
||||
# only expect a bracket if it's not already closed or on the same line
|
||||
stack.append(_m)
|
||||
if match.group('sblock') is None:
|
||||
expect_bracket = True
|
||||
|
||||
# we've matched a simple keyvalue pair, map it to the last dict obj in the stack
|
||||
else:
|
||||
# if the value is line consume one more line and try to match again,
|
||||
# until we get the KeyValue pair
|
||||
if match.group('vq_end') is None and match.group('qval') is not None:
|
||||
try:
|
||||
line += next(fp)
|
||||
continue
|
||||
except StopIteration:
|
||||
raise SyntaxError("vdf.parse: unexpected EOF (open quote for value?)",
|
||||
(getattr(fp, 'name', '<%s>' % fp.__class__.__name__), lineno, 0, line))
|
||||
|
||||
stack[-1][key] = _unescape(val) if escaped else val
|
||||
|
||||
# exit the loop
|
||||
break
|
||||
|
||||
if len(stack) != 1:
|
||||
raise SyntaxError("vdf.parse: unclosed parenthasis or quotes (EOF)",
|
||||
(getattr(fp, 'name', '<%s>' % fp.__class__.__name__), lineno, 0, line))
|
||||
|
||||
return stack.pop()
|
||||
|
||||
|
||||
def loads(s, **kwargs):
|
||||
"""
|
||||
Deserialize ``s`` (a ``str`` or ``unicode`` instance containing a JSON
|
||||
document) to a Python object.
|
||||
"""
|
||||
if not isinstance(s, string_type):
|
||||
raise TypeError("Expected s to be a str, got %s" % type(s))
|
||||
|
||||
try:
|
||||
fp = unicodeIO(s)
|
||||
except TypeError:
|
||||
fp = strIO(s)
|
||||
|
||||
return parse(fp, **kwargs)
|
||||
|
||||
|
||||
def load(fp, **kwargs):
|
||||
"""
|
||||
Deserialize ``fp`` (a ``.readline()``-supporting file-like object containing
|
||||
a JSON document) to a Python object.
|
||||
"""
|
||||
return parse(fp, **kwargs)
|
||||
|
||||
|
||||
def dumps(obj, pretty=False, escaped=True):
|
||||
"""
|
||||
Serialize ``obj`` to a VDF formatted ``str``.
|
||||
"""
|
||||
if not isinstance(obj, Mapping):
|
||||
raise TypeError("Expected data to be an instance of``dict``")
|
||||
if not isinstance(pretty, bool):
|
||||
raise TypeError("Expected pretty to be of type bool")
|
||||
if not isinstance(escaped, bool):
|
||||
raise TypeError("Expected escaped to be of type bool")
|
||||
|
||||
return ''.join(_dump_gen(obj, pretty, escaped))
|
||||
|
||||
|
||||
def dump(obj, fp, pretty=False, escaped=True):
|
||||
"""
|
||||
Serialize ``obj`` as a VDF formatted stream to ``fp`` (a
|
||||
``.write()``-supporting file-like object).
|
||||
"""
|
||||
if not isinstance(obj, Mapping):
|
||||
raise TypeError("Expected data to be an instance of``dict``")
|
||||
if not hasattr(fp, 'write'):
|
||||
raise TypeError("Expected fp to have write() method")
|
||||
if not isinstance(pretty, bool):
|
||||
raise TypeError("Expected pretty to be of type bool")
|
||||
if not isinstance(escaped, bool):
|
||||
raise TypeError("Expected escaped to be of type bool")
|
||||
|
||||
for chunk in _dump_gen(obj, pretty, escaped):
|
||||
fp.write(chunk)
|
||||
|
||||
|
||||
def _dump_gen(data, pretty=False, escaped=True, level=0):
|
||||
indent = "\t"
|
||||
line_indent = ""
|
||||
|
||||
if pretty:
|
||||
line_indent = indent * level
|
||||
|
||||
for key, value in data.items():
|
||||
if escaped and isinstance(key, string_type):
|
||||
key = _escape(key)
|
||||
|
||||
if isinstance(value, Mapping):
|
||||
yield '%s"%s"\n%s{\n' % (line_indent, key, line_indent)
|
||||
for chunk in _dump_gen(value, pretty, escaped, level+1):
|
||||
yield chunk
|
||||
yield "%s}\n" % line_indent
|
||||
else:
|
||||
if escaped and isinstance(value, string_type):
|
||||
value = _escape(value)
|
||||
|
||||
yield '%s"%s" "%s"\n' % (line_indent, key, value)
|
||||
|
||||
|
||||
# binary VDF
|
||||
class BASE_INT(int_type):
|
||||
def __repr__(self):
|
||||
return "%s(%d)" % (self.__class__.__name__, self)
|
||||
|
||||
class UINT_64(BASE_INT):
|
||||
pass
|
||||
|
||||
class INT_64(BASE_INT):
|
||||
pass
|
||||
|
||||
class POINTER(BASE_INT):
|
||||
pass
|
||||
|
||||
class COLOR(BASE_INT):
|
||||
pass
|
||||
|
||||
BIN_NONE = b'\x00'
|
||||
BIN_STRING = b'\x01'
|
||||
BIN_INT32 = b'\x02'
|
||||
BIN_FLOAT32 = b'\x03'
|
||||
BIN_POINTER = b'\x04'
|
||||
BIN_WIDESTRING = b'\x05'
|
||||
BIN_COLOR = b'\x06'
|
||||
BIN_UINT64 = b'\x07'
|
||||
BIN_END = b'\x08'
|
||||
BIN_INT64 = b'\x0A'
|
||||
BIN_END_ALT = b'\x0B'
|
||||
|
||||
def binary_loads(b, mapper=dict, merge_duplicate_keys=True, alt_format=False, raise_on_remaining=True):
|
||||
"""
|
||||
Deserialize ``b`` (``bytes`` containing a VDF in "binary form")
|
||||
to a Python object.
|
||||
|
||||
``mapper`` specifies the Python object used after deserializetion. ``dict` is
|
||||
used by default. Alternatively, ``collections.OrderedDict`` can be used if you
|
||||
wish to preserve key order. Or any object that acts like a ``dict``.
|
||||
|
||||
``merge_duplicate_keys`` when ``True`` will merge multiple KeyValue lists with the
|
||||
same key into one instead of overwriting. You can se this to ``False`` if you are
|
||||
using ``VDFDict`` and need to preserve the duplicates.
|
||||
"""
|
||||
if not isinstance(b, bytes):
|
||||
raise TypeError("Expected s to be bytes, got %s" % type(b))
|
||||
|
||||
return binary_load(BytesIO(b), mapper, merge_duplicate_keys, alt_format, raise_on_remaining)
|
||||
|
||||
def binary_load(fp, mapper=dict, merge_duplicate_keys=True, alt_format=False, raise_on_remaining=False):
|
||||
"""
|
||||
Deserialize ``fp`` (a ``.read()``-supporting file-like object containing
|
||||
binary VDF) to a Python object.
|
||||
|
||||
``mapper`` specifies the Python object used after deserializetion. ``dict` is
|
||||
used by default. Alternatively, ``collections.OrderedDict`` can be used if you
|
||||
wish to preserve key order. Or any object that acts like a ``dict``.
|
||||
|
||||
``merge_duplicate_keys`` when ``True`` will merge multiple KeyValue lists with the
|
||||
same key into one instead of overwriting. You can se this to ``False`` if you are
|
||||
using ``VDFDict`` and need to preserve the duplicates.
|
||||
"""
|
||||
if not hasattr(fp, 'read') or not hasattr(fp, 'tell') or not hasattr(fp, 'seek'):
|
||||
raise TypeError("Expected fp to be a file-like object with tell()/seek() and read() returning bytes")
|
||||
if not issubclass(mapper, Mapping):
|
||||
raise TypeError("Expected mapper to be subclass of dict, got %s" % type(mapper))
|
||||
|
||||
# helpers
|
||||
int32 = struct.Struct('<i')
|
||||
uint64 = struct.Struct('<Q')
|
||||
int64 = struct.Struct('<q')
|
||||
float32 = struct.Struct('<f')
|
||||
|
||||
def read_string(fp, wide=False):
|
||||
buf, end = b'', -1
|
||||
offset = fp.tell()
|
||||
|
||||
# locate string end
|
||||
while end == -1:
|
||||
chunk = fp.read(64)
|
||||
|
||||
if chunk == b'':
|
||||
raise SyntaxError("Unterminated cstring (offset: %d)" % offset)
|
||||
|
||||
buf += chunk
|
||||
end = buf.find(b'\x00\x00' if wide else b'\x00')
|
||||
|
||||
if wide:
|
||||
end += end % 2
|
||||
|
||||
# rewind fp
|
||||
fp.seek(end - len(buf) + (2 if wide else 1), 1)
|
||||
|
||||
# decode string
|
||||
result = buf[:end]
|
||||
|
||||
if wide:
|
||||
result = result.decode('utf-16')
|
||||
elif bytes is not str:
|
||||
result = result.decode('utf-8', 'replace')
|
||||
else:
|
||||
try:
|
||||
result.decode('ascii')
|
||||
except:
|
||||
result = result.decode('utf-8', 'replace')
|
||||
|
||||
return result
|
||||
|
||||
stack = [mapper()]
|
||||
CURRENT_BIN_END = BIN_END if not alt_format else BIN_END_ALT
|
||||
|
||||
for t in iter(lambda: fp.read(1), b''):
|
||||
if t == CURRENT_BIN_END:
|
||||
if len(stack) > 1:
|
||||
stack.pop()
|
||||
continue
|
||||
break
|
||||
|
||||
key = read_string(fp)
|
||||
|
||||
if t == BIN_NONE:
|
||||
if merge_duplicate_keys and key in stack[-1]:
|
||||
_m = stack[-1][key]
|
||||
else:
|
||||
_m = mapper()
|
||||
stack[-1][key] = _m
|
||||
stack.append(_m)
|
||||
elif t == BIN_STRING:
|
||||
stack[-1][key] = read_string(fp)
|
||||
elif t == BIN_WIDESTRING:
|
||||
stack[-1][key] = read_string(fp, wide=True)
|
||||
elif t in (BIN_INT32, BIN_POINTER, BIN_COLOR):
|
||||
val = int32.unpack(fp.read(int32.size))[0]
|
||||
|
||||
if t == BIN_POINTER:
|
||||
val = POINTER(val)
|
||||
elif t == BIN_COLOR:
|
||||
val = COLOR(val)
|
||||
|
||||
stack[-1][key] = val
|
||||
elif t == BIN_UINT64:
|
||||
stack[-1][key] = UINT_64(uint64.unpack(fp.read(int64.size))[0])
|
||||
elif t == BIN_INT64:
|
||||
stack[-1][key] = INT_64(int64.unpack(fp.read(int64.size))[0])
|
||||
elif t == BIN_FLOAT32:
|
||||
stack[-1][key] = float32.unpack(fp.read(float32.size))[0]
|
||||
else:
|
||||
raise SyntaxError("Unknown data type at offset %d: %s" % (fp.tell() - 1, repr(t)))
|
||||
|
||||
if len(stack) != 1:
|
||||
raise SyntaxError("Reached EOF, but Binary VDF is incomplete")
|
||||
if raise_on_remaining and fp.read(1) != b'':
|
||||
fp.seek(-1, 1)
|
||||
raise SyntaxError("Binary VDF ended at offset %d, but there is more data remaining" % (fp.tell() - 1))
|
||||
|
||||
return stack.pop()
|
||||
|
||||
def binary_dumps(obj, alt_format=False):
|
||||
"""
|
||||
Serialize ``obj`` to a binary VDF formatted ``bytes``.
|
||||
"""
|
||||
buf = BytesIO()
|
||||
binary_dump(obj, buf, alt_format)
|
||||
return buf.getvalue()
|
||||
|
||||
def binary_dump(obj, fp, alt_format=False):
|
||||
"""
|
||||
Serialize ``obj`` to a binary VDF formatted ``bytes`` and write it to ``fp`` filelike object
|
||||
"""
|
||||
if not isinstance(obj, Mapping):
|
||||
raise TypeError("Expected obj to be type of Mapping")
|
||||
if not hasattr(fp, 'write'):
|
||||
raise TypeError("Expected fp to have write() method")
|
||||
|
||||
for chunk in _binary_dump_gen(obj, alt_format=alt_format):
|
||||
fp.write(chunk)
|
||||
|
||||
def _binary_dump_gen(obj, level=0, alt_format=False):
|
||||
if level == 0 and len(obj) == 0:
|
||||
return
|
||||
|
||||
int32 = struct.Struct('<i')
|
||||
uint64 = struct.Struct('<Q')
|
||||
int64 = struct.Struct('<q')
|
||||
float32 = struct.Struct('<f')
|
||||
|
||||
for key, value in obj.items():
|
||||
if isinstance(key, string_type):
|
||||
key = key.encode('utf-8')
|
||||
else:
|
||||
raise TypeError("dict keys must be of type str, got %s" % type(key))
|
||||
|
||||
if isinstance(value, Mapping):
|
||||
yield BIN_NONE + key + BIN_NONE
|
||||
for chunk in _binary_dump_gen(value, level+1, alt_format=alt_format):
|
||||
yield chunk
|
||||
elif isinstance(value, UINT_64):
|
||||
yield BIN_UINT64 + key + BIN_NONE + uint64.pack(value)
|
||||
elif isinstance(value, INT_64):
|
||||
yield BIN_INT64 + key + BIN_NONE + int64.pack(value)
|
||||
elif isinstance(value, string_type):
|
||||
try:
|
||||
value = value.encode('utf-8') + BIN_NONE
|
||||
yield BIN_STRING
|
||||
except:
|
||||
value = value.encode('utf-16') + BIN_NONE*2
|
||||
yield BIN_WIDESTRING
|
||||
yield key + BIN_NONE + value
|
||||
elif isinstance(value, float):
|
||||
yield BIN_FLOAT32 + key + BIN_NONE + float32.pack(value)
|
||||
elif isinstance(value, (COLOR, POINTER, int, int_type)):
|
||||
if isinstance(value, COLOR):
|
||||
yield BIN_COLOR
|
||||
elif isinstance(value, POINTER):
|
||||
yield BIN_POINTER
|
||||
else:
|
||||
yield BIN_INT32
|
||||
yield key + BIN_NONE
|
||||
yield int32.pack(value)
|
||||
else:
|
||||
raise TypeError("Unsupported type: %s" % type(value))
|
||||
|
||||
yield BIN_END if not alt_format else BIN_END_ALT
|
||||
|
||||
|
||||
def vbkv_loads(s, mapper=dict, merge_duplicate_keys=True):
|
||||
"""
|
||||
Deserialize ``s`` (``bytes`` containing a VBKV to a Python object.
|
||||
|
||||
``mapper`` specifies the Python object used after deserializetion. ``dict` is
|
||||
used by default. Alternatively, ``collections.OrderedDict`` can be used if you
|
||||
wish to preserve key order. Or any object that acts like a ``dict``.
|
||||
|
||||
``merge_duplicate_keys`` when ``True`` will merge multiple KeyValue lists with the
|
||||
same key into one instead of overwriting. You can se this to ``False`` if you are
|
||||
using ``VDFDict`` and need to preserve the duplicates.
|
||||
"""
|
||||
if s[:4] != b'VBKV':
|
||||
raise ValueError("Invalid header")
|
||||
|
||||
checksum, = struct.unpack('<i', s[4:8])
|
||||
|
||||
if checksum != crc32(s[8:]):
|
||||
raise ValueError("Invalid checksum")
|
||||
|
||||
return binary_loads(s[8:], mapper, merge_duplicate_keys, alt_format=True)
|
||||
|
||||
def vbkv_dumps(obj):
|
||||
"""
|
||||
Serialize ``obj`` to a VBKV formatted ``bytes``.
|
||||
"""
|
||||
data = b''.join(_binary_dump_gen(obj, alt_format=True))
|
||||
checksum = crc32(data)
|
||||
|
||||
return b'VBKV' + struct.pack('<i', checksum) + data
|
|
@ -1,221 +0,0 @@
|
|||
import sys
|
||||
from collections import Counter
|
||||
|
||||
if sys.version_info[0] >= 3:
|
||||
_iter_values = 'values'
|
||||
_range = range
|
||||
_string_type = str
|
||||
import collections.abc as _c
|
||||
class _kView(_c.KeysView):
|
||||
def __iter__(self):
|
||||
return self._mapping.iterkeys()
|
||||
class _vView(_c.ValuesView):
|
||||
def __iter__(self):
|
||||
return self._mapping.itervalues()
|
||||
class _iView(_c.ItemsView):
|
||||
def __iter__(self):
|
||||
return self._mapping.iteritems()
|
||||
else:
|
||||
_iter_values = 'itervalues'
|
||||
_range = xrange
|
||||
_string_type = basestring
|
||||
_kView = lambda x: list(x.iterkeys())
|
||||
_vView = lambda x: list(x.itervalues())
|
||||
_iView = lambda x: list(x.iteritems())
|
||||
|
||||
|
||||
class VDFDict(dict):
|
||||
def __init__(self, data=None):
|
||||
"""
|
||||
This is a dictionary that supports duplicate keys and preserves insert order
|
||||
|
||||
``data`` can be a ``dict``, or a sequence of key-value tuples. (e.g. ``[('key', 'value'),..]``)
|
||||
The only supported type for key is str.
|
||||
|
||||
Get/set duplicates is done by tuples ``(index, key)``, where index is the duplicate index
|
||||
for the specified key. (e.g. ``(0, 'key')``, ``(1, 'key')``...)
|
||||
|
||||
When the ``key`` is ``str``, instead of tuple, set will create a duplicate and get will look up ``(0, key)``
|
||||
"""
|
||||
self.__omap = []
|
||||
self.__kcount = Counter()
|
||||
|
||||
if data is not None:
|
||||
if not isinstance(data, (list, dict)):
|
||||
raise ValueError("Expected data to be list of pairs or dict, got %s" % type(data))
|
||||
self.update(data)
|
||||
|
||||
def __repr__(self):
|
||||
out = "%s(" % self.__class__.__name__
|
||||
out += "%s)" % repr(list(self.iteritems()))
|
||||
return out
|
||||
|
||||
def __len__(self):
|
||||
return len(self.__omap)
|
||||
|
||||
def _verify_key_tuple(self, key):
|
||||
if len(key) != 2:
|
||||
raise ValueError("Expected key tuple length to be 2, got %d" % len(key))
|
||||
if not isinstance(key[0], int):
|
||||
raise TypeError("Key index should be an int")
|
||||
if not isinstance(key[1], _string_type):
|
||||
raise TypeError("Key value should be a str")
|
||||
|
||||
def _normalize_key(self, key):
|
||||
if isinstance(key, _string_type):
|
||||
key = (0, key)
|
||||
elif isinstance(key, tuple):
|
||||
self._verify_key_tuple(key)
|
||||
else:
|
||||
raise TypeError("Expected key to be a str or tuple, got %s" % type(key))
|
||||
return key
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
if isinstance(key, _string_type):
|
||||
key = (self.__kcount[key], key)
|
||||
self.__omap.append(key)
|
||||
elif isinstance(key, tuple):
|
||||
self._verify_key_tuple(key)
|
||||
if key not in self:
|
||||
raise KeyError("%s doesn't exist" % repr(key))
|
||||
else:
|
||||
raise TypeError("Expected either a str or tuple for key")
|
||||
super(VDFDict, self).__setitem__(key, value)
|
||||
self.__kcount[key[1]] += 1
|
||||
|
||||
def __getitem__(self, key):
|
||||
return super(VDFDict, self).__getitem__(self._normalize_key(key))
|
||||
|
||||
def __delitem__(self, key):
|
||||
key = self._normalize_key(key)
|
||||
result = super(VDFDict, self).__delitem__(key)
|
||||
|
||||
start_idx = self.__omap.index(key)
|
||||
del self.__omap[start_idx]
|
||||
|
||||
dup_idx, skey = key
|
||||
self.__kcount[skey] -= 1
|
||||
tail_count = self.__kcount[skey] - dup_idx
|
||||
|
||||
if tail_count > 0:
|
||||
for idx in _range(start_idx, len(self.__omap)):
|
||||
if self.__omap[idx][1] == skey:
|
||||
oldkey = self.__omap[idx]
|
||||
newkey = (dup_idx, skey)
|
||||
super(VDFDict, self).__setitem__(newkey, self[oldkey])
|
||||
super(VDFDict, self).__delitem__(oldkey)
|
||||
self.__omap[idx] = newkey
|
||||
|
||||
dup_idx += 1
|
||||
tail_count -= 1
|
||||
if tail_count == 0:
|
||||
break
|
||||
|
||||
if self.__kcount[skey] == 0:
|
||||
del self.__kcount[skey]
|
||||
|
||||
return result
|
||||
|
||||
def __iter__(self):
|
||||
return iter(self.iterkeys())
|
||||
|
||||
def __contains__(self, key):
|
||||
return super(VDFDict, self).__contains__(self._normalize_key(key))
|
||||
|
||||
def __eq__(self, other):
|
||||
if isinstance(other, VDFDict):
|
||||
return list(self.items()) == list(other.items())
|
||||
else:
|
||||
return False
|
||||
|
||||
def __ne__(self, other):
|
||||
return not self.__eq__(other)
|
||||
|
||||
def clear(self):
|
||||
super(VDFDict, self).clear()
|
||||
self.__kcount.clear()
|
||||
self.__omap = list()
|
||||
|
||||
def get(self, key, *args):
|
||||
return super(VDFDict, self).get(self._normalize_key(key), *args)
|
||||
|
||||
def setdefault(self, key, default=None):
|
||||
if key not in self:
|
||||
self.__setitem__(key, default)
|
||||
return self.__getitem__(key)
|
||||
|
||||
def pop(self, key):
|
||||
key = self._normalize_key(key)
|
||||
value = self.__getitem__(key)
|
||||
self.__delitem__(key)
|
||||
return value
|
||||
|
||||
def popitem(self):
|
||||
if not self.__omap:
|
||||
raise KeyError("VDFDict is empty")
|
||||
key = self.__omap[-1]
|
||||
return key[1], self.pop(key)
|
||||
|
||||
def update(self, data=None, **kwargs):
|
||||
if isinstance(data, dict):
|
||||
data = data.items()
|
||||
elif not isinstance(data, list):
|
||||
raise TypeError("Expected data to be a list or dict, got %s" % type(data))
|
||||
|
||||
for key, value in data:
|
||||
self.__setitem__(key, value)
|
||||
|
||||
def iterkeys(self):
|
||||
return (key[1] for key in self.__omap)
|
||||
|
||||
def keys(self):
|
||||
return _kView(self)
|
||||
|
||||
def itervalues(self):
|
||||
return (self[key] for key in self.__omap)
|
||||
|
||||
def values(self):
|
||||
return _vView(self)
|
||||
|
||||
def iteritems(self):
|
||||
return ((key[1], self[key]) for key in self.__omap)
|
||||
|
||||
def items(self):
|
||||
return _iView(self)
|
||||
|
||||
def get_all_for(self, key):
|
||||
""" Returns all values of the given key """
|
||||
if not isinstance(key, _string_type):
|
||||
raise TypeError("Key needs to be a string.")
|
||||
return [self[(idx, key)] for idx in _range(self.__kcount[key])]
|
||||
|
||||
def remove_all_for(self, key):
|
||||
""" Removes all items with the given key """
|
||||
if not isinstance(key, _string_type):
|
||||
raise TypeError("Key need to be a string.")
|
||||
|
||||
for idx in _range(self.__kcount[key]):
|
||||
super(VDFDict, self).__delitem__((idx, key))
|
||||
|
||||
self.__omap = list(filter(lambda x: x[1] != key, self.__omap))
|
||||
|
||||
del self.__kcount[key]
|
||||
|
||||
def has_duplicates(self):
|
||||
"""
|
||||
Returns ``True`` if the dict contains keys with duplicates.
|
||||
Recurses through any all keys with value that is ``VDFDict``.
|
||||
"""
|
||||
for n in getattr(self.__kcount, _iter_values)():
|
||||
if n != 1:
|
||||
return True
|
||||
|
||||
def dict_recurse(obj):
|
||||
for v in getattr(obj, _iter_values)():
|
||||
if isinstance(v, VDFDict) and v.has_duplicates():
|
||||
return True
|
||||
elif isinstance(v, dict):
|
||||
return dict_recurse(v)
|
||||
return False
|
||||
|
||||
return dict_recurse(self)
|
Binary file not shown.
Before Width: | Height: | Size: 3.9 KiB |
Binary file not shown.
Before Width: | Height: | Size: 4.1 KiB |
Loading…
Add table
Reference in a new issue