Wiki script - Dump sorgenti delle guide (Python 3)

Da Guide@Debianizzati.Org.

Debian-swirl.png Versioni Compatibili
Debian 8 "Jessie"
Debian testing (non recente)
Verificala con Stretch

Questo semplice script Python 3 si occupa di effettuare il download dei sorgenti delle guide di questo Wiki.
Il file creato si chiamerà "wikidump.txt" (o con qualsivoglia altro nome specificato nella variabile "NOMEFILE" del codice sorgente dello script) e, se si desidera, è possibile modificare il numero di thread (variabile "NUMTHREAD") in base al numero di core/CPU della propria macchina.

Salvare il codice in un file di testo con nome scelto a proprio piacimento e poi eseguirlo. Ad esempio, dopo averlo chiamato "wikidump.py":

$ python3 wikidump.py

Script

#!/usr/bin/python3

import urllib.parse
import urllib.error
import urllib.request
import json
import queue
from threading import Thread
from collections import OrderedDict

URL = 'http://guide.debianizzati.org/api.php'
NOMEFILE = 'wikidump.txt'
NUMTHREAD = 4

def api(parametri):
        
        # codifica parametri
        parametri_enc = urllib.parse.urlencode(parametri).encode('utf-8')
        # creazione URL
        url_req = urllib.request.Request(URL, parametri_enc)
        
        try:
            # dati risposta dal Wiki
            risposta = urllib.request.urlopen(url_req)
            # decodifica risposta
            risposta_dec = risposta.read().decode('utf-8')
            # decodifica in JSON convertendo gli interi in stringhe
            dati_json = json.loads(risposta_dec, parse_int=str)

            return dati_json
               
        except urllib.error.URLError as e:
            print('Errore di connessione al Wiki')
            print(e)


def get_pid_and_titles():
        # Restituisce un dizionario con i pageid (chiavi) e, come valori,
        # una lista di titolo + contenuto della guida (inizialmente vuoto)
        # esempio:
        # {
        #    'pageid1' : ['titolo1', ''],
        #    'pageid2' : ['titolo2', ''],
        #    'pageid3' : ['titolo3', ''],
        #    ...
        #    ...
        # }

        all_titles_params = {
                            'action' : 'query',
                            'list' : 'allpages',
                            'aplimit' : '500',
                            'apfilterredir' : 'nonredirects',
                            'format' : 'json',
                            'apfrom' : ''
                            }
        
        # dizionario ordinato
        mydict = OrderedDict()
        
        while True:
            dati = api(all_titles_params)

            for x in dati['query']['allpages']:
                mydict[x['pageid']] = [x['title'], '']

            # esiste un blocco successivo? Il limite di un blocco è 500.
            try:
                last = dati['query-continue']['allpages']['apfrom']

            except KeyError:
                break
            
            # se esiste un blocco successivo, allora 'apfrom' assume il valore corrispondente
            # al titolo della prima guida del blocco successivo
            all_titles_params['apfrom'] = ''.join(last)
        
        return mydict

    
def get_content(dizionario):
    # accetta in ingresso il dizionario incompleto (senza contenuto delle guide)
    # e restituisce il dizionario completo, cioè qualcosa del tipo:
    # {
    #    'pageid1' : ['titolo1', 'contenuto1'],
    #    'pageid2' : ['titolo2', 'contenuto2'],
    #    'pageid3' : ['titolo3', 'contenuto3'],
    #    ...
    #    ...
    # }
    
    # parametri
    content_ts_params = {
                        'action' : 'query',
                        'prop' : 'revisions',
                        'rvprop' : 'content',
                        'pageids' : '',
                        'format' : 'json'
                        }
    
    # lista delle chiavi del dizionario (pageids)
    lista_pageid = list(dizionario.keys())
    
    # converte la lista appena ottenuta in una lista di liste (di max 50 valori ciascuna in
    # quanto 50 è il max valore consentito dall'API del Wiki)
    # Qui viene usato il valore 40 per evitare possibili problemi
    listFifty = [lista_pageid[start:start + 40] for start in range(0, len(lista_pageid), 40)]
    
    q = queue.Queue()

    for elem in listFifty:
        # separatore tra gli elementi della lista è "|"
        # es. 1234, 5678, 9012 -> 1234|5678|9012
        listaid = "|".join(elem)
        # inserisce questa stringa in una coda
        q.put(listaid)
    
    def mythreaded_func():
        while not q.empty():
            # preleva un valore dalla coda
            mystr = q.get()
            # modifica il valore del dizionario
            content_ts_params['pageids'] = mystr
            
            d_reply = api(content_ts_params)
            
            for subelem in mystr.split('|'):            
                mycontent = d_reply['query']['pages'][subelem]['revisions'][0]['*']
                dizionario[subelem][1] = mycontent
            
            q.task_done()  
            
    for i in range(NUMTHREAD):
        t = Thread(target=mythreaded_func)
        t.start()
    
    q.join()

    return dizionario


def create_dict():
    # viene restituito il dizionario completo da inviare al file  
    
    my_partial_dict = get_pid_and_titles()
    my_full_dict = get_content(my_partial_dict)
      
    return my_full_dict


print('Download delle guide...')
mydict = create_dict()


print('Scrittura sul file...')
with open(NOMEFILE, 'w', encoding='utf8') as f:
    for k,v in mydict.items():
        # parsing di caratteri non consentiti in URL (i.e. il carattere "+")
        name_parsed = urllib.parse.quote(v[0])
        f.write('þþþþ\nguide.debianizzati.org/index.php?title=' + name_parsed + '&action=edit\n' + v[1] + '\nøøøø\n')

print("Script terminato")
        
Strumenti personali
Namespace
Varianti
Azioni
Navigazione
Risorse
Strumenti