Python多线程GET和POST读取URL内容

#coding=utf-8
import threading,redis
from time import ctime,sleep
import urllib.parse,urllib.request
import json
 
def http_get(url, data={}, encoding='utf-8'):
    url_values = urllib.parse.urlencode(data)
    full_url = url + '?' + url_values
 
    result = {}
    try:
        with urllib.request.urlopen(url) as response:
            result['http_code'] =  str(response.code)
            result['http_result'] = response.read().decode(encoding)
            result['http_url'] =  str(url)
            result['http_param'] =  str(data)
            result['http_info'] = str(response.info())
            result['http_real_url'] = str(response.geturl())
    except urllib.error.HTTPError as e:
        result['http_code'] =  str(e.code)
        result['http_result'] =  e.read().decode(encoding)
        result['http_url'] =  str(url)
        result['http_param'] =  str(data)
        result['http_info'] =  str(e.info())
        result['http_real_url'] = str(e.geturl())
     
    return result
 
def http_post(url, data={}, headers={}, encoding='utf-8'):
    data_urlencode = urllib.parse.urlencode(data)
    data_encode = data_urlencode.encode('ascii') # data should be bytes
    request_obj = urllib.request.Request(url, data_encode, headers)
 
    result = {}
    try:
        with urllib.request.urlopen(request_obj) as response:
            result['http_code'] =  response.code
            result['http_result'] = response.read().decode(encoding)
            result['http_url'] =  url
            result['http_param'] =  data
            result['http_info'] = str(response.info())
            result['http_real_url'] = response.geturl()
    except urllib.error.HTTPError as e:
        result['http_code'] =  e.code
        result['http_result'] =  e.read().decode(encoding)
        result['http_url'] =  url
        result['http_param'] =  data
        result['http_info'] =  str(e.info())
        result['http_real_url'] = e.geturl()
 
    return result
 
def test_http_get(num):
    url = 'http://pv.sohu.com/cityjson'
    data = {}
    data['ip'] = '180.169.124.154'
 
    result = http_get(url, data, 'gbk')
    print(result)
 
def test_http_post(num):
    url = 'http://pv.sohu.com/cityjson'
    user_agent = 'Mozilla/5.0 (Windows NT 6.1; Win64; x64)'
    headers = {'User-Agent':user_agent}
    data = {
          'test' : 'test'
    }
 
    result = http_post(url, data, headers, 'gbk')
    print(result)
 
#单线程
# for num in range(1):
#     test_http_post(num)
 
if __name__ == '__main__':
    for num in range(0,1):
        for i in range(0,1):
            print(str(num)+str(i))
            print("start listening thread..............")
            new_thread = threading.Thread(target=test_http_post,args=(str(num)+str(i),))
            # new_thread.setDaemon(True)
            new_thread.start()
            print("end  listening thread..............")
        sleep(1)


by 雪洁 2018-08-17 08:07:41 407 views
我来说几句

相关文章