5#include <zypp-core/zyppng/io/IODevice>
7#include <zypp-core/base/DtorReset>
9#include <zypp-media/MediaException>
10#include <zypp-media/FileCheckException>
11#include <zypp-media/CDTools>
24 , _workerPath( constants::DEFAULT_PROVIDE_WORKER_PATH.data() )
32 MIL <<
"Provider workdir is: " <<
_workDir << std::endl;
62 DBG <<
"Triggering the schedule timer (" <<
reasonStr <<
")" << std::endl;
73 MIL <<
"Provider is not started, NOT scheduling" << std::endl;
78 DBG_PRV <<
"Scheduling triggered during scheduling, returning immediately." << std::endl;
83#ifdef _SC_NPROCESSORS_ONLN
117 if ( (*iMedia)->refCount() > 1 ) {
118 MIL_PRV <<
"Not releasing media " << (*iMedia)->_name <<
" refcount is not zero" << std::endl;
124 if ( (*iMedia)->_idleSince && std::chrono::steady_clock::now() - (*iMedia)->_idleSince.value() >= std::chrono::hours(1) ) {
125 MIL <<
"Detaching medium " << (*iMedia)->_name <<
" for baseUrl " << (*iMedia)->_attachedUrl << std::endl;
129 MIL_PRV <<
"Not releasing media " << (*iMedia)->_name <<
" downloading worker and not timed out yet." << std::endl;
133 auto bQueue = (*iMedia)->_backingQueue.lock();
140 MIL <<
"Detaching medium " << (*iMedia)->_name <<
" for baseUrl " << (*iMedia)->_attachedUrl << std::endl;
145 ERR <<
"Could not send detach request, creating the request failed" << std::endl;
148 ERR <<
"Could not send detach request since no backing queue was defined" << std::endl;
157 const auto schedStart = std::chrono::steady_clock::now();
158 MIL_PRV <<
"Start scheduling" << std::endl;
161 const auto dur = std::chrono::steady_clock::now() -
schedStart;
162 MIL_PRV <<
"Exit scheduling after:" << std::chrono::duration_cast<std::chrono::milliseconds>(
dur ).count () << std::endl;
184 const auto &scheme =
queueIter->_schemeName;
192 MIL_PRV <<
"Start scheduling for scheme:" << scheme <<
" queue size is: " <<
queue.size() << std::endl;
196 ERR <<
"Scheme: " << scheme <<
" failed to return a valid configuration." << std::endl;
198 while(
queue.size() ) {
217 while (
i !=
queue.end() && !(*i) ) {
224 ProvideRequestRef
item = *
i;
228 if(
item->code() == ProvideMessage::Code::Attach ||
item->code() == ProvideMessage::Code::Detach ) {
235 MIL_PRV <<
"Trying to schedule request: " <<
item->urls().front() << std::endl;
251 for (
const auto &url :
item->urls() ) {
254 MIL <<
"Mirror URL " << url <<
" is incompatible with current scheme: " << scheme <<
", ignoring." << std::endl;
258 if(
item->owner()->canRedirectTo(
item, url ) )
261 MIL_PRV <<
"URL was rejected" << url << std::endl;
267 MIL <<
"Request has NO usable URLs" << std::endl;
302 MIL <<
"Current stats: " << std::endl;
311 MIL_PRV <<
"Reached maximum nr of connections, break" << std::endl;
320 MIL_PRV <<
"Free worker slots and available mirror URLs, starting a new worker" << std::endl;
327 if ( !
item->owner()->safeRedirectTo (
item, url ) )
331 if ( !
q->startup( scheme,
_workDir / scheme / url.getHost(), url.getHost() ) ) {
335 MIL_PRV <<
"Started worker for " << url.getHost() <<
" enqueing request" << std::endl;
337 item->setActiveUrl(url);
356 MIL_PRV <<
"No free worker slots, looking for the best existing worker" << std::endl;
361 if (
i->second->activeRequests () <
candidate->second->activeRequests () )
370 MIL_PRV <<
"Using existing worker " <<
candidate->first.getHost() <<
" to download request" << std::endl;
388 MIL_PRV <<
"No free worker slots, no slots in existing queues, trying to decomission idle queues." << std::endl;
401 if ( !
item->owner()->safeRedirectTo (
item, url ) )
405 if ( !
q->startup( scheme,
_workDir / scheme / url.getHost(), url.getHost() ) ) {
409 MIL_PRV <<
"Replaced worker for " << url.getHost() <<
", enqueing request" << std::endl;
411 item->setActiveUrl(url);
428 MIL_PRV <<
"End of line, deferring request for next try." << std::endl;
438 while (
i !=
queue.end() && !(*i) ) {
446 ProvideRequestRef
item = *
i;
450 if(
item->code() == ProvideMessage::Code::Attach ||
item->code() == ProvideMessage::Code::Detach ) {
452 if (
item->owner () )
457 MIL_PRV <<
"Trying to schedule request: " <<
item->urls().front() << std::endl;
475 MIL <<
"Mirror URL " <<
tmpurl <<
" is incompatible with current scheme: " << scheme <<
", ignoring." << std::endl;
484 MIL <<
"Request has NO usable URLs" << std::endl;
510 MIL <<
"Current stats: " << std::endl;
520 MIL_PRV <<
"Using existing idle worker to provide request" << std::endl;
522 item->owner()->redirectTo (
item, url );
523 item->setActiveUrl( url );
537 MIL_PRV <<
"Free CPU slots, starting a new worker" << std::endl;
540 item->owner()->redirectTo (
item, url );
543 if (
q->startup( scheme,
_workDir / scheme ) ) {
545 item->setActiveUrl(url);
563 MIL_PRV <<
"No free CPU slots, looking for the best existing worker" << std::endl;
568 if ( (*i)->activeRequests () < (*candidate)->activeRequests () )
573 item->owner()->redirectTo (
item, url );
575 MIL_PRV <<
"Using existing worker to provide request" << std::endl;
576 item->setActiveUrl( url );
577 (*candidate)->enqueue(
item );
587 MIL_PRV <<
"No free CPU slots, no slots in existing queues, trying to decomission idle queues." << std::endl;
595 item->owner()->redirectTo (
item, url );
598 if (
q->startup( scheme,
_workDir / scheme ) ) {
600 MIL_PRV <<
"Replaced worker, enqueing request" << std::endl;
602 item->setActiveUrl(url);
618 MIL_PRV <<
"No idle workers and no free CPU spots, wait for the next schedule run" << std::endl;
623 MIL_PRV <<
"End of line, deferring request for next try." << std::endl;
634 while (
i !=
queue.end() && !(*i) ) {
642 ProvideRequestRef
item = *
i;
643 MIL_PRV <<
"Trying to schedule request: " <<
item->urls().front() << std::endl;
650 MIL <<
"Mirror URL " <<
tmpurl <<
" is incompatible with current scheme: " << scheme <<
", ignoring." << std::endl;
659 MIL <<
"Request has NO usable URLs" << std::endl;
670 if ( !
q->startup( scheme,
_workDir / scheme ) ) {
671 ERR <<
"Worker startup failed!" << std::endl;
680 MIL_PRV <<
"Started worker, enqueing request" << std::endl;
684 MIL_PRV <<
"Found worker, enqueing request" << std::endl;
689 item->owner()->redirectTo (
item, url );
691 item->setActiveUrl(url);
699 std::list<ProvideItemRef> &ProvidePrivate::items()
706 return _credManagerOptions;
709 std::vector<zypp::Url> ProvidePrivate::sanitizeUrls(
const std::vector<zypp::Url> &urls)
712 std::optional<ProvideQueue::Config> scheme;
715 const auto &
s = schemeConfig( effectiveScheme(
mirrIt->getScheme() ) );
717 WAR <<
"URL: " << *
mirrIt <<
" is not supported, ignoring!" << std::endl;
724 if ( scheme->worker_type () ==
s->worker_type () ) {
727 WAR <<
"URL: " << *
mirrIt <<
" has different worker type than the primary URL: "<<
usableMirrs.front() <<
", ignoring!" << std::endl;
739 std::vector<AttachedMediaInfo_Ptr> &ProvidePrivate::attachedMediaInfos()
741 return _attachedMediaInfos;
746 if (
auto i = _schemeConfigs.find( scheme );
i != _schemeConfigs.end() ) {
751 if ( !
q.startup( scheme, _workDir / scheme ) ) {
754 auto newItem = _schemeConfigs.insert( std::make_pair( scheme,
q.workerConfig() ));
764 _fileCache.erase ( key );
771 i.first->second._deathTimer.reset();
772 return i.first->second._file;
776 return i.first->second._file;
782 return (_fileCache.count(key) > 0);
787 _items.push_back(
item );
788 schedule( ProvidePrivate::EnqueueItem );
793 auto elem = std::find_if( _items.begin(), _items.end(), [
item](
const auto &
i){ return i.get() == item; } );
794 if (
elem != _items.end() ) {
795 if ( _isScheduling ) {
803 std::string ProvidePrivate::nextMediaId()
const
809 AttachedMediaInfo_Ptr ProvidePrivate::addMedium( AttachedMediaInfo_Ptr &&
medium )
815 MIL_PRV <<
"Registered new media attachment with ID: " <<
medium->name() <<
" with mountPoint: (" <<
medium->_localMountPoint.value_or(
zypp::Pathname()) <<
")" << std::endl;
816 _attachedMediaInfos.push_back( std::move(
medium) );
818 return _attachedMediaInfos.back();
821 bool ProvidePrivate::queueRequest ( ProvideRequestRef req )
823 const auto &
schemeName = effectiveScheme( req->url().getScheme() );
825 return (qItem._schemeName == schemeName);
833 schedule( ProvidePrivate::EnqueueReq );
837 bool ProvidePrivate::dequeueRequest(ProvideRequestRef req , std::exception_ptr error)
839 auto queue = req->currentQueue ();
841 queue->cancel( req.get(), error );
845 for (
auto &
q : _queues ) {
846 auto elem = std::find(
q._requests.begin(),
q._requests.end(), req );
847 if (
elem !=
q._requests.end() ) {
848 q._requests.erase(
elem);
851 req->owner()->finishReq(
nullptr, req, error );
866 for (
const auto &
v : _workerQueues ) {
867 if (
v.second.get() == &
q )
873 bool ProvidePrivate::isRunning()
const
878 std::string ProvidePrivate::effectiveScheme(
const std::string &scheme)
const
881 if (
auto it = _workerAlias.find (
ss );
it != _workerAlias.end () ) {
887 void ProvidePrivate::onPulseTimeout(
Timer & )
889 DBG_PRV <<
"Pulse timeout" << std::endl;
891 auto now = std::chrono::steady_clock::now();
893 if ( _log ) _log->pulse();
896 for (
auto i = _fileCache.begin ();
i != _fileCache.end(); ) {
900 if ( now - *
cacheItem._deathTimer < std::chrono::seconds(20) ) {
901 MIL <<
"Releasing file " << *
i->second._file <<
" from cache, death timeout." << std::endl;
902 i = _fileCache.erase(
i);
907 cacheItem._deathTimer = std::chrono::steady_clock::now();
915 void ProvidePrivate::onQueueIdle()
917 if ( !_items.empty() )
919 for (
auto &[
k,
q] : _workerQueues ) {
930 if (
item.state() == ProvideItem::Finished ) {
932 auto i = std::find( _items.begin(), _items.end(),
itemRef );
933 if (
i == _items.end() ) {
934 ERR <<
"State of unknown Item changed, ignoring" << std::endl;
942 if ( _items.empty() )
946 uint32_t ProvidePrivate::nextRequestId()
949 return ++_nextRequestId;
953 : _parent( parent.weak_this<
Provide>() )
1011 std::vector<zypp::Url>
usableMirrs =
d->sanitizeUrls( urls );
1029 MIL <<
"Attaching lazy medium with label: [" <<
lazyHandle.spec().label() <<
"]" << std::endl;
1033 lazyHandle._sharedData->_mediaHandle = handle;
1048 std::vector<zypp::Url>
usableMirrs =
d->sanitizeUrls( urls );
1054 auto &attachedMedia =
d->attachedMediaInfos ();
1055 for (
auto &
medium : attachedMedia ) {
1063 return op->promise();
1071 return op->promise();
1082 const auto i = std::find(
d->_attachedMediaInfos.begin(),
d->_attachedMediaInfos.end(),
attachHandle.mediaInfo() );
1083 if (
i ==
d->_attachedMediaInfos.end() ) {
1105 return op->promise();
1116 return me->provide( handle,
fName, req);
1130 return expected<zypp::CheckSum>::success( zypp::CheckSum( algorithm, chksumRes.headers().value(algorithm).asString() ) );
1157 auto fName = source.file();
1159 | [
resSave = std::move(source) ] (
auto &&result ) {
1169 d->_isRunning =
true;
1170 d->_pulseTimer->start( 5000 );
1172 if (
d->_log )
d->_log->provideStart();
1177 d_func()->_workerPath = path;
1195 return d_func()->_workDir;
1201 return d->_credManagerOptions;
1206 d_func()->_credManagerOptions = opt;
1211 return d_func()->_sigIdle;
1216 return d_func()->_sigMediaChange;
1221 return d_func()->_sigAuthRequired;
1227 : _provider( parent )
1240 const auto &
fTime =
item.finishedTime();
1242 auto duration = std::chrono::duration_cast<std::chrono::seconds>(
item.finishedTime() -
item.startTime() );
1245 MIL <<
"Item finished after " << (
item.finishedTime() -
item.startTime()).
count() <<
" ns" << std::endl;
1252 MIL <<
"Item failed" << std::endl;
1271 for (
const auto &
i :
prov->d_func()->items() ) {
1285 const auto &
stats =
i->currentStats();
1288 ERR <<
"Bug! Stats should be initialized by now" << std::endl;
1303 const auto now = std::chrono::steady_clock::now();
1312 if (
sinceLast >= std::chrono::seconds(1) )
Reference counted access to a Tp object calling a custom Dispose function when the last AutoDispose h...
reference value() const
Reference to the Tp object.
bool unique() const
Returns true if this is the only AutoDispose instance managing the current data object.
Store and operate with byte count.
Assign a vaiable a certain value when going out of scope.
Base class for Exception.
std::string getScheme() const
Returns the scheme name of the URL.
void setAuthority(const std::string &authority)
Set the authority component in the URL.
void setPathName(const std::string &path, EEncoding eflag=zypp::url::E_DECODED)
Set the path name.
bool isValid() const
Verifies the Url.
void appendPathName(const Pathname &path_r, EEncoding eflag_r=zypp::url::E_DECODED)
Extend the path name.
void setScheme(const std::string &scheme)
Set the scheme name in the URL.
Wrapper class for stat/lstat.
bool empty() const
Test for an empty path.
Pathname realpath() const
Returns this path as the absolute canonical pathname.
static auto connect(typename internal::MemberFunction< SenderFunc >::ClassType &s, SenderFunc &&sFun, typename internal::MemberFunction< ReceiverFunc >::ClassType &recv, ReceiverFunc &&rFunc)
static ProvideFileItemRef create(const std::vector< zypp::Url > &urls, const ProvideFileSpec &request, ProvidePrivate &parent)
const std::string queueName(ProvideQueue &q) const
std::string effectiveScheme(const std::string &scheme) const
std::list< ProvideItemRef > _items
ProvidePrivate(zypp::Pathname &&workDir, Provide &pub)
Timer::Ptr _scheduleTrigger
std::unordered_map< std::string, ProvideQueueRef > _workerQueues
std::vector< AttachedMediaInfo_Ptr > _attachedMediaInfos
void onPulseTimeout(Timer &)
std::deque< QueueItem > _queues
void schedule(ScheduleReason reason)
expected< ProvideQueue::Config > schemeConfig(const std::string &scheme)
std::chrono::time_point< std::chrono::steady_clock > TimePoint
static expected< ProvideRequestRef > createDetach(const zypp::Url &url)
A ProvideRes object is a reference counted ownership of a resource in the cache provided by a Provide...
virtual void provideStart()
const Stats & stats() const
ProvideStatus(ProvideRef parent)
virtual void itemFailed(ProvideItem &item)
virtual void itemDone(ProvideItem &item)
AsyncOpRef< expected< ProvideRes > > provide(const std::vector< zypp::Url > &urls, const ProvideFileSpec &request)
const zypp::media::CredManagerOptions & credManangerOptions() const
static ProvideRef create(const zypp::Pathname &workDir="")
AsyncOpRef< expected< MediaHandle > > attachMedia(const std::vector< zypp::Url > &urls, const ProvideMediaSpec &request)
void setWorkerPath(const zypp::Pathname &path)
SignalProxy< std::optional< zypp::media::AuthData >(const zypp::Url &reqUrl, const std::string &triedUsername, const std::map< std::string, std::string > &extraValues) sigAuthRequired)()
SignalProxy< void()> sigIdle()
void setCredManagerOptions(const zypp::media::CredManagerOptions &opt)
Provide(const zypp::Pathname &workDir)
AsyncOpRef< expected< zypp::CheckSum > > checksumForFile(const zypp::Pathname &p, const std::string &algorithm)
void setStatusTracker(ProvideStatusRef tracker)
AsyncOpRef< expected< MediaHandle > > attachMediaIfNeeded(LazyMediaHandle lazyHandle)
bool ejectDevice(const std::string &queueRef, const std::string &device)
expected< LazyMediaHandle > prepareMedia(const std::vector< zypp::Url > &urls, const ProvideMediaSpec &request)
AsyncOpRef< expected< zypp::ManagedFile > > copyFile(const zypp::Pathname &source, const zypp::Pathname &target)
std::optional< Action > MediaChangeAction
const zypp::Pathname & providerWorkdir() const
SignalProxy< MediaChangeAction(const std::string &queueRef, const std::string &label, const int32_t mediaNr, const std::vector< std::string > &devices, const std::optional< std::string > &desc) sigMediaChangeRequested)()
ProvideMediaHandle MediaHandle
The Timer class provides repetitive and single-shot timers.
SignalProxy< void(Timer &t) sigExpired)()
This signal is always emitted when the timer expires.
static expected success(ConsParams &&...params)
String related utilities and Regular expression matching.
int unlink(const Pathname &path)
Like 'unlink'.
const std::string & asString(const std::string &t)
Global asString() that works with std::string too.
bool startsWith(const C_Str &str_r, const C_Str &prefix_r)
alias for hasPrefix
std::string stripSuffix(const C_Str &str_r, const C_Str &suffix_r)
Strip a suffix_r from str_r and return the resulting string.
Easy-to use interface to the ZYPP dependency resolver.
AutoDispose< const Pathname > ManagedFile
A Pathname plus associated cleanup code to be executed when path is no longer needed.
constexpr auto DEFAULT_ACTIVE_CONN_PER_HOST
constexpr auto DEFAULT_ACTIVE_CONN
constexpr auto DEFAULT_MAX_DYNAMIC_WORKERS
constexpr std::string_view ATTACHED_MEDIA_SUFFIX
std::conditional_t< isAsync, AsyncOpRef< T >, T > makeReadyResult(T &&result)
ResultType and_then(const expected< T, E > &exp, Function &&f)
bool provideDebugEnabled()
zypp::ByteCount _partialBytes
zypp::ByteCount _perSecondSinceLastPulse
zypp::ByteCount _perSecond
zypp::ByteCount _expectedBytes
std::chrono::steady_clock::time_point _startTime
zypp::ByteCount _finishedBytes
std::chrono::steady_clock::time_point _lastPulseTime
#define ZYPP_EXCPT_PTR(EXCPT)
Drops a logline and returns Exception as a std::exception_ptr.
#define ZYPP_FWD_CURRENT_EXCPT()
Drops a logline and returns the current Exception as a std::exception_ptr.
#define L_ENV_CONSTR_DEFINE_FUNC(ENV)
#define ZYPP_IMPL_PRIVATE(Class)